This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 77e51cb7fa NIFI-15210 Fixed verify method of GCS processors and other
GCP fixes
77e51cb7fa is described below
commit 77e51cb7fa615364836c55f56e8828214934c83b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Nov 12 17:22:29 2025 +0100
NIFI-15210 Fixed verify method of GCS processors and other GCP fixes
Signed-off-by: Pierre Villard <[email protected]>
This closes #10519.
---
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 900 ++++++++---------
.../processors/gcp/pubsub/PublishGCPubSub.java | 1008 ++++++++++----------
.../gcp/storage/AbstractGCSProcessor.java | 12 +-
.../processors/gcp/storage/DeleteGCSObject.java | 6 +-
.../processors/gcp/storage/FetchGCSObject.java | 5 +-
.../nifi/processors/gcp/storage/ListGCSBucket.java | 13 -
.../nifi/processors/gcp/storage/PutGCSObject.java | 5 +-
.../gcp/drive/AbstractGoogleDriveIT.java | 2 +
.../processors/gcp/pubsub/AbstractGCPubSubIT.java | 95 +-
.../nifi/processors/gcp/storage/AbstractGCSIT.java | 3 +
.../processors/gcp/storage/FetchGCSObjectIT.java | 4 +-
11 files changed, 1023 insertions(+), 1030 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 5e118fe617..b85e99884b 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -1,450 +1,450 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.api.pathtemplate.ValidationException;
-import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
-import com.google.cloud.pubsub.v1.stub.SubscriberStub;
-import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
-import com.google.iam.v1.TestIamPermissionsRequest;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.ProjectSubscriptionName;
-import com.google.pubsub.v1.PullRequest;
-import com.google.pubsub.v1.PullResponse;
-import com.google.pubsub.v1.ReceivedMessage;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.migration.PropertyConfiguration;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
-import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
-import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
-import
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
-import
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
-
-@SeeAlso({ PublishGCPubSub.class })
-@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
-@CapabilityDescription(
- """
- Consumes messages from the configured Google Cloud PubSub subscription.
The 'Batch Size' property specified the maximum
- number of messages that will be pulled from the subscription in a single
request. The 'Processing Strategy' property
- specifies if each message should be its own FlowFile or if messages
should be grouped into a single FlowFile. Using the
- Demarcator strategy will provide best throughput when the format allows
it. Using Record allows to convert data format
- as well as doing schema enforcement. Using the FlowFile strategy will
generate one FlowFile per message and will have
- the message's attributes as FlowFile attributes.
- """
-)
-@WritesAttributes(
- {
- @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description =
ACK_ID_DESCRIPTION),
- @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE,
description = SERIALIZED_SIZE_DESCRIPTION),
- @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
- @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE,
description = MSG_PUBLISH_TIME_DESCRIPTION),
- @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE,
description = SUBSCRIPTION_NAME_DESCRIPTION),
- @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE,
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
- }
-)
-public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
-
- private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.subscriptions.consume");
-
- public static final PropertyDescriptor SUBSCRIPTION = new
PropertyDescriptor.Builder()
- .name("Subscription")
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .description("Name of the Google Cloud Pub/Sub Subscription")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final PropertyDescriptor PROCESSING_STRATEGY = new
PropertyDescriptor.Builder()
- .name("Processing Strategy")
- .description("Strategy for processing PubSub Records and writing
serialized output to FlowFiles")
- .required(true)
- .allowableValues(ProcessingStrategy.class)
- .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
- .expressionLanguageSupported(NONE)
- .build();
-
- public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
- .name("Record Reader")
- .description("The Record Reader to use for incoming messages")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(true)
- .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
- .build();
-
- public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
- .name("Record Writer")
- .description("The Record Writer to use in order to serialize the
outgoing FlowFiles")
- .identifiesControllerService(RecordSetWriterFactory.class)
- .required(true)
- .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
- .build();
-
- public static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
- .name("Output Strategy")
- .description("The format used to output the Kafka Record into a
FlowFile Record.")
- .required(true)
- .defaultValue(OutputStrategy.USE_VALUE)
- .allowableValues(OutputStrategy.class)
- .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
- .build();
-
- public static final PropertyDescriptor MESSAGE_DEMARCATOR = new
PropertyDescriptor.Builder()
- .name("Message Demarcator")
- .required(true)
- .addValidator(Validator.VALID)
- .description(
- """
- Since the PubSub client receives messages in batches, this
Processor has an option to output FlowFiles
- which contains all the messages in a single batch. This
property allows you to provide a string
- (interpreted as UTF-8) to use for demarcating apart multiple
messages. To enter special character
- such as 'new line' use CTRL+Enter or Shift+Enter depending on
the OS.
- """)
- .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
- .build();
-
- public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
- .name("parse failure")
- .description("If configured to use a Record Reader, a PubSub
message that cannot be parsed using the configured Record Reader will be routed
to this relationship")
- .build();
-
- private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- PROJECT_ID,
- SUBSCRIPTION,
- BATCH_SIZE_THRESHOLD,
- PROCESSING_STRATEGY,
- RECORD_READER,
- RECORD_WRITER,
- OUTPUT_STRATEGY,
- MESSAGE_DEMARCATOR,
- API_ENDPOINT,
- PROXY_CONFIGURATION_SERVICE);
-
- private static final Set<Relationship> SUCCESS_RELATIONSHIP =
Set.of(REL_SUCCESS);
- private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
-
- protected SubscriberStub subscriber = null;
- private PullRequest pullRequest;
- protected volatile OutputStrategy outputStrategy;
- protected volatile ProcessingStrategy processingStrategy;
- private volatile boolean useReader;
- protected volatile String demarcatorValue;
-
- private final AtomicReference<Exception> storedException = new
AtomicReference<>();
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTY_DESCRIPTORS;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return useReader ? SUCCESS_FAILURE_RELATIONSHIPS :
SUCCESS_RELATIONSHIP;
- }
-
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
- if (descriptor.equals(RECORD_READER)) {
- useReader = newValue != null;
- }
- }
-
- @Override
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- final Integer batchSize =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
-
- pullRequest = PullRequest.newBuilder()
- .setMaxMessages(batchSize)
- .setSubscription(getSubscriptionName(context, null))
- .build();
-
- try {
- subscriber = getSubscriber(context);
- } catch (IOException e) {
- storedException.set(e);
- getLogger().error("Failed to create Google Cloud Subscriber", e);
- }
-
- processingStrategy =
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
- outputStrategy = processingStrategy == ProcessingStrategy.RECORD ?
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) :
null;
- demarcatorValue = processingStrategy == ProcessingStrategy.DEMARCATOR
? context.getProperty(MESSAGE_DEMARCATOR).getValue() : null;
- }
-
- @Override
- public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
- final List<ConfigVerificationResult> results = new ArrayList<>();
- String subscriptionName = null;
- try {
- subscriptionName = getSubscriptionName(context, attributes);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Parse Subscription Name")
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully parsed Subscription Name")
- .build());
- } catch (final ValidationException e) {
- verificationLogger.error("Failed to parse Subscription Name", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Parse Subscription Name")
- .outcome(Outcome.FAILED)
- .explanation(String.format("Failed to parse Subscription
Name: " + e.getMessage()))
- .build());
- }
- SubscriberStub subscriber = null;
- try {
- subscriber = getSubscriber(context);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create Subscriber")
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully created Subscriber")
- .build());
- } catch (final IOException e) {
- verificationLogger.error("Failed to create Subscriber", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create Subscriber")
- .outcome(Outcome.FAILED)
- .explanation(String.format("Failed to create Subscriber: "
+ e.getMessage()))
- .build());
- }
-
- if (subscriber != null && subscriptionName != null) {
- try {
- final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
- if
(subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
-
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
- .explanation(String.format("Verified Subscription
[%s] exists and the configured user has the correct permissions.",
subscriptionName))
- .build());
- } else {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
- .outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The configured user
does not have the correct permissions on Subscription [%s].", subscriptionName))
- .build());
- }
- } catch (final ApiException e) {
- verificationLogger.error("The configured user appears to have
the correct permissions, but the following error was encountered", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
- .outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The configured user
appears to have the correct permissions, but the following error was
encountered: " + e.getMessage()))
- .build());
- }
- }
- return results;
- }
-
- @OnStopped
- public void onStopped() {
- if (subscriber != null) {
- subscriber.shutdown();
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (subscriber == null) {
- if (storedException.get() != null) {
- getLogger().error("Failed to create Google Cloud PubSub
subscriber due to {}", storedException.get());
- } else {
- getLogger().error("Google Cloud PubSub Subscriber was not
properly created. Yielding the processor...");
- }
-
- context.yield();
- return;
- }
-
- final PullResponse pullResponse =
subscriber.pullCallable().call(pullRequest);
- final List<String> ackIds = new ArrayList<>();
- final String subscriptionName = getSubscriptionName(context, null);
- List<ReceivedMessage> receivedMessages =
pullResponse.getReceivedMessagesList();
-
- switch (processingStrategy) {
- case RECORD -> processInputRecords(context, session,
receivedMessages, subscriptionName, ackIds);
- case FLOW_FILE -> processInputFlowFile(session, receivedMessages,
subscriptionName, ackIds);
- case DEMARCATOR -> processInputDemarcator(session,
receivedMessages, subscriptionName, ackIds);
- }
-
- session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
- }
-
- @Override
- public void migrateProperties(PropertyConfiguration config) {
- super.migrateProperties(config);
- config.renameProperty("gcp-pubsub-subscription",
SUBSCRIPTION.getName());
- }
-
- private void processInputDemarcator(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName,
- final List<String> ackIds) {
- final byte[] demarcator = demarcatorValue == null ? new byte[0] :
demarcatorValue.getBytes(StandardCharsets.UTF_8);
- FlowFile flowFile = session.create();
-
- try {
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- for (ReceivedMessage message : receivedMessages) {
- if (message.hasMessage()) {
-
out.write(message.getMessage().getData().toByteArray());
- out.write(demarcator);
- ackIds.add(message.getAckId());
- }
- }
- }
- });
- session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE,
subscriptionName);
- } catch (Exception e) {
- ackIds.clear();
- session.remove(flowFile);
- throw new ProcessException("Failed to write batch of messages in
FlowFile", e);
- }
-
- if (flowFile.getSize() > 0) {
- session.putAttribute(flowFile, "record.count",
String.valueOf(ackIds.size()));
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile,
subscriptionName);
- session.adjustCounter("Records Received from " + subscriptionName,
ackIds.size(), false);
- } else {
- session.remove(flowFile);
- }
- }
-
- private void processInputFlowFile(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName, final
List<String> ackIds) {
- for (ReceivedMessage message : receivedMessages) {
- if (message.hasMessage()) {
- FlowFile flowFile = session.create();
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
- attributes.put(SERIALIZED_SIZE_ATTRIBUTE,
String.valueOf(message.getSerializedSize()));
- attributes.put(MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
- attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
String.valueOf(message.getMessage().getAttributesCount()));
- attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE,
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
- attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
- attributes.putAll(message.getMessage().getAttributesMap());
-
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.write(flowFile, out ->
out.write(message.getMessage().getData().toByteArray()));
-
- ackIds.add(message.getAckId());
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile,
subscriptionName);
- }
- }
- }
-
- private void processInputRecords(final ProcessContext context, final
ProcessSession session, final List<ReceivedMessage> receivedMessages, final
String subscriptionName,
- final List<String> ackIds) {
- final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- PubSubMessageConverter converter = switch (outputStrategy) {
- case USE_VALUE -> new
RecordStreamPubSubMessageConverter(readerFactory, writerFactory, getLogger());
- case USE_WRAPPER -> new
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory,
getLogger());
- };
-
- converter.toFlowFiles(session, receivedMessages, ackIds,
subscriptionName);
- }
-
- private void acknowledgeAcks(final Collection<String> ackIds, final String
subscriptionName) {
- if (ackIds == null || ackIds.isEmpty()) {
- return;
- }
-
- AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
- .addAllAckIds(ackIds)
- .setSubscription(subscriptionName)
- .build();
- subscriber.acknowledgeCallable().call(acknowledgeRequest);
- }
-
- private String getSubscriptionName(final ProcessContext context, final
Map<String, String> additionalAttributes) {
- final String subscriptionName =
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
- final String projectId =
context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
-
- if (subscriptionName.contains("/")) {
- return ProjectSubscriptionName.parse(subscriptionName).toString();
- } else {
- return ProjectSubscriptionName.of(projectId,
subscriptionName).toString();
- }
-
- }
-
- protected SubscriberStub getSubscriber(final ProcessContext context)
throws IOException {
- final String endpoint = context.getProperty(API_ENDPOINT).getValue();
-
- final SubscriberStubSettings.Builder subscriberBuilder =
SubscriberStubSettings.newBuilder()
-
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-
.setTransportChannelProvider(getTransportChannelProvider(context))
- .setEndpoint(endpoint);
-
- return GrpcSubscriberStub.create(subscriberBuilder.build());
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.pathtemplate.ValidationException;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.consume.OutputStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.ProcessingStrategy;
+import org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter;
+import
org.apache.nifi.processors.gcp.pubsub.consume.RecordStreamPubSubMessageConverter;
+import
org.apache.nifi.processors.gcp.pubsub.consume.WrapperRecordStreamPubSubMessageConverter;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SUBSCRIPTION_NAME_DESCRIPTION;
+
+@SeeAlso({ PublishGCPubSub.class })
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({ "google", "google-cloud", "gcp", "message", "pubsub", "consume" })
+@CapabilityDescription(
+ """
+ Consumes messages from the configured Google Cloud PubSub subscription.
The 'Batch Size' property specified the maximum
+ number of messages that will be pulled from the subscription in a single
request. The 'Processing Strategy' property
+ specifies if each message should be its own FlowFile or if messages
should be grouped into a single FlowFile. Using the
+ Demarcator strategy will provide best throughput when the format allows
it. Using Record allows to convert data format
+ as well as doing schema enforcement. Using the FlowFile strategy will
generate one FlowFile per message and will have
+ the message's attributes as FlowFile attributes.
+ """
+)
+@WritesAttributes(
+ {
+ @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description =
ACK_ID_DESCRIPTION),
+ @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE,
description = SERIALIZED_SIZE_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+ @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE,
description = MSG_PUBLISH_TIME_DESCRIPTION),
+ @WritesAttribute(attribute = SUBSCRIPTION_NAME_ATTRIBUTE,
description = SUBSCRIPTION_NAME_DESCRIPTION),
+ @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE,
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+ }
+)
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+
+ private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.subscriptions.consume");
+
+ public static final PropertyDescriptor SUBSCRIPTION = new
PropertyDescriptor.Builder()
+ .name("Subscription")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .description("Name of the Google Cloud Pub/Sub Subscription")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor PROCESSING_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Processing Strategy")
+ .description("Strategy for processing PubSub Records and writing
serialized output to FlowFiles")
+ .required(true)
+ .allowableValues(ProcessingStrategy.class)
+ .defaultValue(ProcessingStrategy.FLOW_FILE.getValue())
+ .expressionLanguageSupported(NONE)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("The Record Reader to use for incoming messages")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .description("The Record Writer to use in order to serialize the
outgoing FlowFiles")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Output Strategy")
+ .description("The format used to output the Kafka Record into a
FlowFile Record.")
+ .required(true)
+ .defaultValue(OutputStrategy.USE_VALUE)
+ .allowableValues(OutputStrategy.class)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_DEMARCATOR = new
PropertyDescriptor.Builder()
+ .name("Message Demarcator")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .description(
+ """
+ Since the PubSub client receives messages in batches, this
Processor has an option to output FlowFiles
+ which contains all the messages in a single batch. This
property allows you to provide a string
+ (interpreted as UTF-8) to use for demarcating apart multiple
messages. To enter special character
+ such as 'new line' use CTRL+Enter or Shift+Enter depending on
the OS.
+ """)
+ .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR)
+ .build();
+
+ public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
+ .name("parse failure")
+ .description("If configured to use a Record Reader, a PubSub
message that cannot be parsed using the configured Record Reader will be routed
to this relationship")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ PROJECT_ID,
+ SUBSCRIPTION,
+ BATCH_SIZE_THRESHOLD,
+ PROCESSING_STRATEGY,
+ RECORD_READER,
+ RECORD_WRITER,
+ OUTPUT_STRATEGY,
+ MESSAGE_DEMARCATOR,
+ API_ENDPOINT,
+ PROXY_CONFIGURATION_SERVICE);
+
+ private static final Set<Relationship> SUCCESS_RELATIONSHIP =
Set.of(REL_SUCCESS);
+ private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
+
+ protected SubscriberStub subscriber = null;
+ private PullRequest pullRequest;
+ protected volatile OutputStrategy outputStrategy;
+ protected volatile ProcessingStrategy processingStrategy;
+ private volatile boolean useReader;
+ protected volatile String demarcatorValue;
+
+ private final AtomicReference<Exception> storedException = new
AtomicReference<>();
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return useReader ? SUCCESS_FAILURE_RELATIONSHIPS :
SUCCESS_RELATIONSHIP;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (descriptor.equals(RECORD_READER)) {
+ useReader = newValue != null;
+ }
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ final Integer batchSize =
context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
+
+ pullRequest = PullRequest.newBuilder()
+ .setMaxMessages(batchSize)
+ .setSubscription(getSubscriptionName(context, null))
+ .build();
+
+ try {
+ subscriber = getSubscriber(context);
+ } catch (IOException e) {
+ storedException.set(e);
+ getLogger().error("Failed to create Google Cloud Subscriber", e);
+ }
+
+ processingStrategy =
context.getProperty(PROCESSING_STRATEGY).asAllowableValue(ProcessingStrategy.class);
+ outputStrategy = processingStrategy == ProcessingStrategy.RECORD ?
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class) :
null;
+ demarcatorValue = processingStrategy == ProcessingStrategy.DEMARCATOR
? context.getProperty(MESSAGE_DEMARCATOR).getValue() : null;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ String subscriptionName = null;
+ try {
+ subscriptionName = getSubscriptionName(context, attributes);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Subscription Name")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully parsed Subscription Name")
+ .build());
+ } catch (final ValidationException e) {
+ verificationLogger.error("Failed to parse Subscription Name", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Subscription Name")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to parse Subscription
Name: " + e.getMessage()))
+ .build());
+ }
+ SubscriberStub subscriber = null;
+ try {
+ subscriber = getSubscriber(context);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Subscriber")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully created Subscriber")
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("Failed to create Subscriber", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Subscriber")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to create Subscriber: "
+ e.getMessage()))
+ .build());
+ }
+
+ if (subscriber != null && subscriptionName != null) {
+ try {
+ final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
+ if
(subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Subscription
[%s] exists and the configured user has the correct permissions.",
subscriptionName))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user
does not have the correct permissions on Subscription [%s].", subscriptionName))
+ .build());
+ }
+ } catch (final ApiException e) {
+ verificationLogger.error("The configured user appears to have
the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user
appears to have the correct permissions, but the following error was
encountered: " + e.getMessage()))
+ .build());
+ }
+ }
+ return results;
+ }
+
+ @OnStopped
+ public void onStopped() {
+ if (subscriber != null) {
+ subscriber.shutdown();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ if (subscriber == null) {
+ if (storedException.get() != null) {
+ getLogger().error("Failed to create Google Cloud PubSub
subscriber due to {}", storedException.get());
+ } else {
+ getLogger().error("Google Cloud PubSub Subscriber was not
properly created. Yielding the processor...");
+ }
+
+ context.yield();
+ return;
+ }
+
+ final PullResponse pullResponse =
subscriber.pullCallable().call(pullRequest);
+ final List<String> ackIds = new ArrayList<>();
+ final String subscriptionName = getSubscriptionName(context, null);
+ List<ReceivedMessage> receivedMessages =
pullResponse.getReceivedMessagesList();
+
+ switch (processingStrategy) {
+ case RECORD -> processInputRecords(context, session,
receivedMessages, subscriptionName, ackIds);
+ case FLOW_FILE -> processInputFlowFile(session, receivedMessages,
subscriptionName, ackIds);
+ case DEMARCATOR -> processInputDemarcator(session,
receivedMessages, subscriptionName, ackIds);
+ }
+
+ session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+ }
+
+ @Override
+ public void migrateProperties(PropertyConfiguration config) {
+ super.migrateProperties(config);
+ config.renameProperty("gcp-pubsub-subscription",
SUBSCRIPTION.getName());
+ }
+
+ private void processInputDemarcator(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName,
+ final List<String> ackIds) {
+ final byte[] demarcator = demarcatorValue == null ? new byte[0] :
demarcatorValue.getBytes(StandardCharsets.UTF_8);
+ FlowFile flowFile = session.create();
+
+ try {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ for (ReceivedMessage message : receivedMessages) {
+ if (message.hasMessage()) {
+
out.write(message.getMessage().getData().toByteArray());
+ out.write(demarcator);
+ ackIds.add(message.getAckId());
+ }
+ }
+ }
+ });
+ session.putAttribute(flowFile, SUBSCRIPTION_NAME_ATTRIBUTE,
subscriptionName);
+ } catch (Exception e) {
+ ackIds.clear();
+ session.remove(flowFile);
+ throw new ProcessException("Failed to write batch of messages in
FlowFile", e);
+ }
+
+ if (flowFile.getSize() > 0) {
+ session.putAttribute(flowFile, "record.count",
String.valueOf(ackIds.size()));
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile,
subscriptionName);
+ session.adjustCounter("Records Received from " + subscriptionName,
ackIds.size(), false);
+ } else {
+ session.remove(flowFile);
+ }
+ }
+
+ private void processInputFlowFile(final ProcessSession session, final
List<ReceivedMessage> receivedMessages, final String subscriptionName, final
List<String> ackIds) {
+ for (ReceivedMessage message : receivedMessages) {
+ if (message.hasMessage()) {
+ FlowFile flowFile = session.create();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ACK_ID_ATTRIBUTE, message.getAckId());
+ attributes.put(SERIALIZED_SIZE_ATTRIBUTE,
String.valueOf(message.getSerializedSize()));
+ attributes.put(MESSAGE_ID_ATTRIBUTE,
message.getMessage().getMessageId());
+ attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE,
String.valueOf(message.getMessage().getAttributesCount()));
+ attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE,
String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+ attributes.put(SUBSCRIPTION_NAME_ATTRIBUTE, subscriptionName);
+ attributes.putAll(message.getMessage().getAttributesMap());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, out ->
out.write(message.getMessage().getData().toByteArray()));
+
+ ackIds.add(message.getAckId());
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile,
subscriptionName);
+ }
+ }
+ }
+
+ private void processInputRecords(final ProcessContext context, final
ProcessSession session, final List<ReceivedMessage> receivedMessages, final
String subscriptionName,
+ final List<String> ackIds) {
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ PubSubMessageConverter converter = switch (outputStrategy) {
+ case USE_VALUE -> new
RecordStreamPubSubMessageConverter(readerFactory, writerFactory, getLogger());
+ case USE_WRAPPER -> new
WrapperRecordStreamPubSubMessageConverter(readerFactory, writerFactory,
getLogger());
+ };
+
+ converter.toFlowFiles(session, receivedMessages, ackIds,
subscriptionName);
+ }
+
+ private void acknowledgeAcks(final Collection<String> ackIds, final String
subscriptionName) {
+ if (ackIds == null || ackIds.isEmpty()) {
+ return;
+ }
+
+ AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+ .addAllAckIds(ackIds)
+ .setSubscription(subscriptionName)
+ .build();
+ subscriber.acknowledgeCallable().call(acknowledgeRequest);
+ }
+
+ private String getSubscriptionName(final ProcessContext context, final
Map<String, String> additionalAttributes) {
+ final String subscriptionName =
context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
+ final String projectId =
context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
+
+ if (subscriptionName.contains("/")) {
+ return ProjectSubscriptionName.parse(subscriptionName).toString();
+ } else {
+ return ProjectSubscriptionName.of(projectId,
subscriptionName).toString();
+ }
+
+ }
+
+ protected SubscriberStub getSubscriber(final ProcessContext context)
throws IOException {
+ final String endpoint =
context.getProperty(API_ENDPOINT).evaluateAttributeExpressions().getValue();
+
+ final SubscriberStubSettings.Builder subscriberBuilder =
SubscriberStubSettings.newBuilder()
+
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+
.setTransportChannelProvider(getTransportChannelProvider(context))
+ .setEndpoint(endpoint);
+
+ return GrpcSubscriberStub.create(subscriberBuilder.build());
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 83bb3c6ee9..66d51038a9 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -1,504 +1,504 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutureCallback;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.batching.BatchingSettings;
-import com.google.api.gax.core.FixedCredentialsProvider;
-import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.core.FixedExecutorProvider;
-import com.google.cloud.pubsub.v1.Publisher;
-import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
-import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.iam.v1.TestIamPermissionsRequest;
-import com.google.iam.v1.TestIamPermissionsResponse;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
-import com.google.pubsub.v1.ProjectTopicName;
-import com.google.pubsub.v1.PubsubMessage;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.migration.PropertyConfiguration;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
-import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
-import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.record.PushBackRecordSet;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.StopWatch;
-import org.threeten.bp.Duration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
-import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
-
-@SeeAlso({ConsumeGCPubSub.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
-@CapabilityDescription("Publishes the content of the incoming flowfile to the
configured Google Cloud PubSub topic. The processor supports dynamic
properties." +
- " If any dynamic properties are present, they will be sent along with
the message in the form of 'attributes'.")
-@DynamicProperty(name = "Attribute name", value = "Value to be set to the
attribute",
- description = "Attributes to be set for the outgoing Google Cloud
PubSub message", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-@WritesAttributes({
- @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
- @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description =
RECORDS_DESCRIPTION),
- @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description =
TOPIC_NAME_DESCRIPTION)
-})
-@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"The entirety of the FlowFile's content "
- + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubProcessor {
- private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.topics.publish");
- private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
-
- public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
- .name("Input Batch Size")
- .description("Maximum number of FlowFiles processed for each
Processor invocation")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .addValidator(StandardValidators.NUMBER_VALIDATOR)
- .defaultValue("100")
- .build();
-
- public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new
PropertyDescriptor.Builder()
- .name("Message Derivation Strategy")
- .description("The strategy used to publish the incoming FlowFile
to the Google Cloud PubSub endpoint.")
- .required(true)
-
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
- .allowableValues(MessageDerivationStrategy.class)
- .build();
-
- public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
- .name("Record Reader")
- .description("The Record Reader to use for incoming FlowFiles")
- .identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
- .required(true)
- .build();
-
- public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
- .name("Record Writer")
- .description("The Record Writer to use in order to serialize the
data before sending to GCPubSub endpoint")
- .identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
- .required(true)
- .build();
-
- public static final PropertyDescriptor MAX_MESSAGE_SIZE = new
PropertyDescriptor.Builder()
- .name("Maximum Message Size")
- .description("The maximum size of a Google PubSub message in
bytes. Defaults to 1 MB (1048576 bytes)")
- .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
- .required(true)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .defaultValue("1 MB")
- .build();
-
- public static final PropertyDescriptor TOPIC_NAME = new
PropertyDescriptor.Builder()
- .name("Topic Name")
- .description("Name of the Google Cloud PubSub Topic")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- public static final Relationship REL_RETRY = new Relationship.Builder()
- .name("retry")
- .description("FlowFiles are routed to this relationship if the
Google Cloud Pub/Sub operation fails but attempting the operation again may
succeed.")
- .build();
-
- private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- PROJECT_ID,
- TOPIC_NAME,
- MESSAGE_DERIVATION_STRATEGY,
- RECORD_READER,
- RECORD_WRITER,
- MAX_BATCH_SIZE,
- MAX_MESSAGE_SIZE,
- BATCH_SIZE_THRESHOLD,
- BATCH_BYTES_THRESHOLD,
- BATCH_DELAY_THRESHOLD,
- API_ENDPOINT,
- PROXY_CONFIGURATION_SERVICE
- );
-
- public static final Set<Relationship> RELATIONSHIPS = Set.of(
- REL_SUCCESS,
- REL_FAILURE,
- REL_RETRY
- );
-
- protected Publisher publisher = null;
-
- @Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTY_DESCRIPTORS;
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .required(false)
- .name(propertyDescriptorName)
- .displayName(propertyDescriptorName)
-
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
- .dynamic(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return RELATIONSHIPS;
- }
-
- @Override
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- try {
- publisher = getPublisherBuilder(context).build();
- } catch (IOException e) {
- throw new ProcessException("Failed to create Google Cloud PubSub
Publisher", e);
- }
- }
-
- @Override
- public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
- final List<ConfigVerificationResult> results = new ArrayList<>();
- Publisher publisher = null;
- try {
- publisher = getPublisherBuilder(context).build();
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create Publisher")
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully created Publisher")
- .build());
- } catch (final IOException e) {
- verificationLogger.error("Failed to create Publisher", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Create Publisher")
- .outcome(Outcome.FAILED)
- .explanation(String.format("Failed to create Publisher: "
+ e.getMessage()))
- .build());
- }
-
- if (publisher != null) {
- try {
- final PublisherStubSettings publisherStubSettings =
PublisherStubSettings.newBuilder()
-
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
-
.setTransportChannelProvider(getTransportChannelProvider(context))
- .build();
-
- try (GrpcPublisherStub publisherStub =
GrpcPublisherStub.create(publisherStubSettings)) {
- final String topicName =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
- final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder()
- .addAllPermissions(REQUIRED_PERMISSIONS)
- .setResource(topicName)
- .build();
- final TestIamPermissionsResponse response =
publisherStub.testIamPermissionsCallable().call(request);
- if (response.getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
-
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
- .explanation(String.format("Verified Topic
[%s] exists and the configured user has the correct permissions.", topicName))
- .build());
- } else {
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
-
.outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The configured
user does not have the correct permissions on Topic [%s].", topicName))
- .build());
- }
- }
- } catch (final ApiException e) {
- verificationLogger.error("The configured user appears to have
the correct permissions, but the following error was encountered", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
- .outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The configured user
appears to have the correct permissions, but the following error was
encountered: " + e.getMessage()))
- .build());
- } catch (final IOException e) {
- verificationLogger.error("The publisher stub could not be
created in order to test the permissions", e);
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName("Test IAM Permissions")
- .outcome(ConfigVerificationResult.Outcome.FAILED)
- .explanation(String.format("The publisher stub could
not be created in order to test the permissions: " + e.getMessage()))
- .build());
-
- }
- }
- return results;
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- final StopWatch stopWatch = new StopWatch(true);
- final MessageDerivationStrategy inputStrategy =
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
- final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).asInteger();
- final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
- if (flowFileBatch.isEmpty()) {
- context.yield();
- } else if
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
- onTriggerFlowFileStrategy(context, session, stopWatch,
flowFileBatch);
- } else if
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
- onTriggerRecordStrategy(context, session, stopWatch,
flowFileBatch);
- } else {
- throw new IllegalStateException(inputStrategy.getValue());
- }
- }
-
- @Override
- public void migrateProperties(PropertyConfiguration config) {
- super.migrateProperties(config);
- config.renameProperty("gcp-pubsub-topic", TOPIC_NAME.getName());
- }
-
- private void onTriggerFlowFileStrategy(
- final ProcessContext context,
- final ProcessSession session,
- final StopWatch stopWatch,
- final List<FlowFile> flowFileBatch) throws ProcessException {
- final long maxMessageSize =
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
-
- final Executor executor = MoreExecutors.directExecutor();
- final List<FlowFileResult> flowFileResults = new ArrayList<>();
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- for (final FlowFile flowFile : flowFileBatch) {
- final List<ApiFuture<String>> futures = new ArrayList<>();
- final List<String> successes = Collections.synchronizedList(new
ArrayList<>());
- final List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
-
- if (flowFile.getSize() > maxMessageSize) {
- final String message = String.format("FlowFile size %d exceeds
MAX_MESSAGE_SIZE", flowFile.getSize());
- failures.add(new IllegalArgumentException(message));
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
- } else {
- baos.reset();
- session.exportTo(flowFile, baos);
-
- final ApiFuture<String> apiFuture = publishOneMessage(context,
flowFile, baos.toByteArray());
- futures.add(apiFuture);
- addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
- }
- }
- finishBatch(session, stopWatch, flowFileResults);
- }
-
- private void onTriggerRecordStrategy(
- final ProcessContext context,
- final ProcessSession session,
- final StopWatch stopWatch,
- final List<FlowFile> flowFileBatch) throws ProcessException {
- try {
- onTriggerRecordStrategyPublishRecords(context, session, stopWatch,
flowFileBatch);
- } catch (IOException | SchemaNotFoundException |
MalformedRecordException e) {
- throw new ProcessException("Record publishing failed", e);
- }
- }
-
- private void onTriggerRecordStrategyPublishRecords(
- final ProcessContext context,
- final ProcessSession session,
- final StopWatch stopWatch,
- final List<FlowFile> flowFileBatch)
- throws ProcessException, IOException, SchemaNotFoundException,
MalformedRecordException {
- final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- final Executor executor = MoreExecutors.directExecutor();
- final List<FlowFileResult> flowFileResults = new ArrayList<>();
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- for (final FlowFile flowFile : flowFileBatch) {
- final List<ApiFuture<String>> futures = new ArrayList<>();
- final List<String> successes = Collections.synchronizedList(new
ArrayList<>());
- final List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
-
- final Map<String, String> attributes = flowFile.getAttributes();
- try (final RecordReader reader = readerFactory.createRecordReader(
- attributes, session.read(flowFile), flowFile.getSize(),
getLogger())) {
- final RecordSet recordSet = reader.createRecordSet();
- final RecordSchema schema =
writerFactory.getSchema(attributes, recordSet.getSchema());
-
- final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, baos, attributes);
- final PushBackRecordSet pushBackRecordSet = new
PushBackRecordSet(recordSet);
-
- while (pushBackRecordSet.isAnotherRecord()) {
- final ApiFuture<String> apiFuture =
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
- futures.add(apiFuture);
- addCallback(apiFuture, new
TrackedApiFutureCallback(successes, failures), executor);
- }
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
- }
- }
- finishBatch(session, stopWatch, flowFileResults);
- }
-
- private ApiFuture<String> publishOneRecord(
- final ProcessContext context,
- final FlowFile flowFile,
- final ByteArrayOutputStream baos,
- final RecordSetWriter writer,
- final Record record) throws IOException {
- baos.reset();
- writer.write(record);
- writer.flush();
- return publishOneMessage(context, flowFile, baos.toByteArray());
- }
-
- private ApiFuture<String> publishOneMessage(final ProcessContext context,
- final FlowFile flowFile,
- final byte[] content) {
- final PubsubMessage message = PubsubMessage.newBuilder()
- .setData(ByteString.copyFrom(content))
- .setPublishTime(Timestamp.newBuilder().build())
- .putAllAttributes(getDynamicAttributesMap(context, flowFile))
- .build();
- return publisher.publish(message);
- }
-
- private void finishBatch(final ProcessSession session,
- final StopWatch stopWatch,
- final List<FlowFileResult> flowFileResults) {
- final String topicName = publisher.getTopicNameString();
- for (final FlowFileResult flowFileResult : flowFileResults) {
- final Relationship relationship = flowFileResult.reconcile();
- final Map<String, String> attributes =
flowFileResult.getAttributes();
- attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
- final FlowFile flowFile =
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
- final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING,
topicName);
- session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, relationship);
- }
- }
-
- protected void addCallback(final ApiFuture<String> apiFuture, final
ApiFutureCallback<? super String> callback, Executor executor) {
- ApiFutures.addCallback(apiFuture, callback, executor);
- }
-
- @OnStopped
- public void onStopped() {
- shutdownPublisher();
- }
-
- private void shutdownPublisher() {
- try {
- if (publisher != null) {
- publisher.shutdown();
- }
- } catch (Exception e) {
- getLogger().warn("Failed to gracefully shutdown the Google Cloud
PubSub Publisher", e);
- }
- }
-
- private ProjectTopicName getTopicName(ProcessContext context) {
- final String topic =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
- final String projectId =
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
-
- if (topic.contains("/")) {
- return ProjectTopicName.parse(topic);
- } else {
- return ProjectTopicName.of(projectId, topic);
- }
- }
-
- private Map<String, String> getDynamicAttributesMap(ProcessContext
context, FlowFile flowFile) {
- final Map<String, String> attributes = new HashMap<>();
- for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
- if (entry.getKey().isDynamic()) {
- final String value =
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
- attributes.put(entry.getKey().getName(), value);
- }
- }
-
- return attributes;
- }
-
- private Publisher.Builder getPublisherBuilder(ProcessContext context) {
- final Long batchSizeThreshold =
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
- final long batchBytesThreshold =
context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
- final Long batchDelayThreshold =
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
- final String endpoint = context.getProperty(API_ENDPOINT).getValue();
-
- final Publisher.Builder publisherBuilder =
Publisher.newBuilder(getTopicName(context))
-
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
- .setChannelProvider(getTransportChannelProvider(context))
- .setEndpoint(endpoint);
-
- publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
- .setElementCountThreshold(batchSizeThreshold)
- .setRequestByteThreshold(batchBytesThreshold)
- .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
- .setIsEnabled(true)
- .build());
-
- // Set fixed thread pool executor to number of concurrent tasks
- publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new
ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks())));
-
- return publisherBuilder;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.iam.v1.TestIamPermissionsResponse;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.migration.PropertyConfiguration;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
+import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
+import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.threeten.bp.Duration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.RECORDS_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
+
+@SeeAlso({ConsumeGCPubSub.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
+@CapabilityDescription("Publishes the content of the incoming flowfile to the
configured Google Cloud PubSub topic. The processor supports dynamic
properties." +
+ " If any dynamic properties are present, they will be sent along with
the message in the form of 'attributes'.")
+@DynamicProperty(name = "Attribute name", value = "Value to be set to the
attribute",
+ description = "Attributes to be set for the outgoing Google Cloud
PubSub message", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@WritesAttributes({
+ @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
+ @WritesAttribute(attribute = RECORDS_ATTRIBUTE, description =
RECORDS_DESCRIPTION),
+ @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description =
TOPIC_NAME_DESCRIPTION)
+})
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"The entirety of the FlowFile's content "
+ + "will be read into memory to be sent as a PubSub message.")
+public class PublishGCPubSub extends AbstractGCPubSubProcessor {
+ private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.topics.publish");
+ private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
+
+ public static final PropertyDescriptor MAX_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Input Batch Size")
+ .description("Maximum number of FlowFiles processed for each
Processor invocation")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .addValidator(StandardValidators.NUMBER_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Message Derivation Strategy")
+ .description("The strategy used to publish the incoming FlowFile
to the Google Cloud PubSub endpoint.")
+ .required(true)
+
.defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+ .allowableValues(MessageDerivationStrategy.class)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .description("The Record Reader to use for incoming FlowFiles")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .description("The Record Writer to use in order to serialize the
data before sending to GCPubSub endpoint")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor MAX_MESSAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Message Size")
+ .description("The maximum size of a Google PubSub message in
bytes. Defaults to 1 MB (1048576 bytes)")
+ .dependsOn(MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue())
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
+
+ public static final PropertyDescriptor TOPIC_NAME = new
PropertyDescriptor.Builder()
+ .name("Topic Name")
+ .description("Name of the Google Cloud PubSub Topic")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("FlowFiles are routed to this relationship if the
Google Cloud Pub/Sub operation fails but attempting the operation again may
succeed.")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ PROJECT_ID,
+ TOPIC_NAME,
+ MESSAGE_DERIVATION_STRATEGY,
+ RECORD_READER,
+ RECORD_WRITER,
+ MAX_BATCH_SIZE,
+ MAX_MESSAGE_SIZE,
+ BATCH_SIZE_THRESHOLD,
+ BATCH_BYTES_THRESHOLD,
+ BATCH_DELAY_THRESHOLD,
+ API_ENDPOINT,
+ PROXY_CONFIGURATION_SERVICE
+ );
+
+ public static final Set<Relationship> RELATIONSHIPS = Set.of(
+ REL_SUCCESS,
+ REL_FAILURE,
+ REL_RETRY
+ );
+
+ protected Publisher publisher = null;
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .required(false)
+ .name(propertyDescriptorName)
+ .displayName(propertyDescriptorName)
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ try {
+ publisher = getPublisherBuilder(context).build();
+ } catch (IOException e) {
+ throw new ProcessException("Failed to create Google Cloud PubSub
Publisher", e);
+ }
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ Publisher publisher = null;
+ try {
+ publisher = getPublisherBuilder(context).build();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Publisher")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully created Publisher")
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("Failed to create Publisher", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Publisher")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to create Publisher: "
+ e.getMessage()))
+ .build());
+ }
+
+ if (publisher != null) {
+ try {
+ final PublisherStubSettings publisherStubSettings =
PublisherStubSettings.newBuilder()
+
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+
.setTransportChannelProvider(getTransportChannelProvider(context))
+ .build();
+
+ try (GrpcPublisherStub publisherStub =
GrpcPublisherStub.create(publisherStubSettings)) {
+ final String topicName =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+ final TestIamPermissionsRequest request =
TestIamPermissionsRequest.newBuilder()
+ .addAllPermissions(REQUIRED_PERMISSIONS)
+ .setResource(topicName)
+ .build();
+ final TestIamPermissionsResponse response =
publisherStub.testIamPermissionsCallable().call(request);
+ if (response.getPermissionsCount() >=
REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Topic
[%s] exists and the configured user has the correct permissions.", topicName))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+
.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured
user does not have the correct permissions on Topic [%s].", topicName))
+ .build());
+ }
+ }
+ } catch (final ApiException e) {
+ verificationLogger.error("The configured user appears to have
the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user
appears to have the correct permissions, but the following error was
encountered: " + e.getMessage()))
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("The publisher stub could not be
created in order to test the permissions", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The publisher stub could
not be created in order to test the permissions: " + e.getMessage()))
+ .build());
+
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final StopWatch stopWatch = new StopWatch(true);
+ final MessageDerivationStrategy inputStrategy =
MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
+ final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).asInteger();
+ final List<FlowFile> flowFileBatch = session.get(maxBatchSize);
+ if (flowFileBatch.isEmpty()) {
+ context.yield();
+ } else if
(MessageDerivationStrategy.FLOWFILE_ORIENTED.equals(inputStrategy)) {
+ onTriggerFlowFileStrategy(context, session, stopWatch,
flowFileBatch);
+ } else if
(MessageDerivationStrategy.RECORD_ORIENTED.equals(inputStrategy)) {
+ onTriggerRecordStrategy(context, session, stopWatch,
flowFileBatch);
+ } else {
+ throw new IllegalStateException(inputStrategy.getValue());
+ }
+ }
+
+ @Override
+ public void migrateProperties(PropertyConfiguration config) {
+ super.migrateProperties(config);
+ config.renameProperty("gcp-pubsub-topic", TOPIC_NAME.getName());
+ }
+
+ private void onTriggerFlowFileStrategy(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch) throws ProcessException {
+ final long maxMessageSize =
context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
+
+ final Executor executor = MoreExecutors.directExecutor();
+ final List<FlowFileResult> flowFileResults = new ArrayList<>();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ for (final FlowFile flowFile : flowFileBatch) {
+ final List<ApiFuture<String>> futures = new ArrayList<>();
+ final List<String> successes = Collections.synchronizedList(new
ArrayList<>());
+ final List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
+
+ if (flowFile.getSize() > maxMessageSize) {
+ final String message = String.format("FlowFile size %d exceeds
MAX_MESSAGE_SIZE", flowFile.getSize());
+ failures.add(new IllegalArgumentException(message));
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
+ } else {
+ baos.reset();
+ session.exportTo(flowFile, baos);
+
+ final ApiFuture<String> apiFuture = publishOneMessage(context,
flowFile, baos.toByteArray());
+ futures.add(apiFuture);
+ addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
+ }
+ }
+ finishBatch(session, stopWatch, flowFileResults);
+ }
+
+ private void onTriggerRecordStrategy(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch) throws ProcessException {
+ try {
+ onTriggerRecordStrategyPublishRecords(context, session, stopWatch,
flowFileBatch);
+ } catch (IOException | SchemaNotFoundException |
MalformedRecordException e) {
+ throw new ProcessException("Record publishing failed", e);
+ }
+ }
+
+ private void onTriggerRecordStrategyPublishRecords(
+ final ProcessContext context,
+ final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFile> flowFileBatch)
+ throws ProcessException, IOException, SchemaNotFoundException,
MalformedRecordException {
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final Executor executor = MoreExecutors.directExecutor();
+ final List<FlowFileResult> flowFileResults = new ArrayList<>();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ for (final FlowFile flowFile : flowFileBatch) {
+ final List<ApiFuture<String>> futures = new ArrayList<>();
+ final List<String> successes = Collections.synchronizedList(new
ArrayList<>());
+ final List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
+
+ final Map<String, String> attributes = flowFile.getAttributes();
+ try (final RecordReader reader = readerFactory.createRecordReader(
+ attributes, session.read(flowFile), flowFile.getSize(),
getLogger())) {
+ final RecordSet recordSet = reader.createRecordSet();
+ final RecordSchema schema =
writerFactory.getSchema(attributes, recordSet.getSchema());
+
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+ final PushBackRecordSet pushBackRecordSet = new
PushBackRecordSet(recordSet);
+
+ while (pushBackRecordSet.isAnotherRecord()) {
+ final ApiFuture<String> apiFuture =
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
+ futures.add(apiFuture);
+ addCallback(apiFuture, new
TrackedApiFutureCallback(successes, failures), executor);
+ }
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
+ }
+ }
+ finishBatch(session, stopWatch, flowFileResults);
+ }
+
+ private ApiFuture<String> publishOneRecord(
+ final ProcessContext context,
+ final FlowFile flowFile,
+ final ByteArrayOutputStream baos,
+ final RecordSetWriter writer,
+ final Record record) throws IOException {
+ baos.reset();
+ writer.write(record);
+ writer.flush();
+ return publishOneMessage(context, flowFile, baos.toByteArray());
+ }
+
+ private ApiFuture<String> publishOneMessage(final ProcessContext context,
+ final FlowFile flowFile,
+ final byte[] content) {
+ final PubsubMessage message = PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(content))
+ .setPublishTime(Timestamp.newBuilder().build())
+ .putAllAttributes(getDynamicAttributesMap(context, flowFile))
+ .build();
+ return publisher.publish(message);
+ }
+
+ private void finishBatch(final ProcessSession session,
+ final StopWatch stopWatch,
+ final List<FlowFileResult> flowFileResults) {
+ final String topicName = publisher.getTopicNameString();
+ for (final FlowFileResult flowFileResult : flowFileResults) {
+ final Relationship relationship = flowFileResult.reconcile();
+ final Map<String, String> attributes =
flowFileResult.getAttributes();
+ attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
+ final FlowFile flowFile =
session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
+ final String transitUri = String.format(TRANSIT_URI_FORMAT_STRING,
topicName);
+ session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, relationship);
+ }
+ }
+
+ protected void addCallback(final ApiFuture<String> apiFuture, final
ApiFutureCallback<? super String> callback, Executor executor) {
+ ApiFutures.addCallback(apiFuture, callback, executor);
+ }
+
+ @OnStopped
+ public void onStopped() {
+ shutdownPublisher();
+ }
+
+ private void shutdownPublisher() {
+ try {
+ if (publisher != null) {
+ publisher.shutdown();
+ }
+ } catch (Exception e) {
+ getLogger().warn("Failed to gracefully shutdown the Google Cloud
PubSub Publisher", e);
+ }
+ }
+
+ private ProjectTopicName getTopicName(ProcessContext context) {
+ final String topic =
context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+ final String projectId =
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+
+ if (topic.contains("/")) {
+ return ProjectTopicName.parse(topic);
+ } else {
+ return ProjectTopicName.of(projectId, topic);
+ }
+ }
+
+ private Map<String, String> getDynamicAttributesMap(ProcessContext
context, FlowFile flowFile) {
+ final Map<String, String> attributes = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ final String value =
context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
+ attributes.put(entry.getKey().getName(), value);
+ }
+ }
+
+ return attributes;
+ }
+
+ private Publisher.Builder getPublisherBuilder(ProcessContext context) {
+ final Long batchSizeThreshold =
context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
+ final long batchBytesThreshold =
context.getProperty(BATCH_BYTES_THRESHOLD).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
+ final Long batchDelayThreshold =
context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String endpoint =
context.getProperty(API_ENDPOINT).evaluateAttributeExpressions().getValue();
+
+ final Publisher.Builder publisherBuilder =
Publisher.newBuilder(getTopicName(context))
+
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+ .setChannelProvider(getTransportChannelProvider(context))
+ .setEndpoint(endpoint);
+
+ publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder()
+ .setElementCountThreshold(batchSizeThreshold)
+ .setRequestByteThreshold(batchBytesThreshold)
+ .setDelayThreshold(Duration.ofMillis(batchDelayThreshold))
+ .setIsEnabled(true)
+ .build());
+
+ // Set fixed thread pool executor to number of concurrent tasks
+ publisherBuilder.setExecutorProvider(FixedExecutorProvider.create(new
ScheduledThreadPoolExecutor(context.getMaxConcurrentTasks())));
+
+ return publisherBuilder;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index e5a8f1eaaf..76a3da1e72 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -43,6 +43,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
+
/**
* Base class for creating processors which connect to Google Cloud Storage.
*
@@ -68,6 +70,14 @@ public abstract class AbstractGCSProcessor extends
AbstractGCPProcessor<Storage,
return RELATIONSHIPS;
}
+ public static final PropertyDescriptor BUCKET = new
PropertyDescriptor.Builder()
+ .name("Bucket")
+ .description(BUCKET_DESC)
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
//
https://cloud.google.com/storage/docs/request-endpoints#storage-set-client-endpoint-java
public static final PropertyDescriptor STORAGE_API_URL = new
PropertyDescriptor.Builder()
.name("Storage API URL")
@@ -137,7 +147,7 @@ public abstract class AbstractGCSProcessor extends
AbstractGCPProcessor<Storage,
protected abstract List<String> getRequiredPermissions();
protected String getBucketName(final ProcessContext context, final
Map<String, String> attributes) {
- return
context.getProperty("gcs-bucket").evaluateAttributeExpressions(attributes).getValue();
+ return
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
}
@Override
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
index 61dc11f5ac..31ab641f1e 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
@@ -37,7 +37,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
-import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
@@ -49,12 +48,9 @@ import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class DeleteGCSObject extends AbstractGCSProcessor {
public static final PropertyDescriptor BUCKET = new
PropertyDescriptor.Builder()
- .name("Bucket")
- .description(BUCKET_DESC)
- .required(true)
+ .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
.defaultValue("${" + BUCKET_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new
PropertyDescriptor.Builder()
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index e900e6d1ce..a9b5dd9915 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -169,12 +169,9 @@ import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
)
public class FetchGCSObject extends AbstractGCSProcessor {
public static final PropertyDescriptor BUCKET = new
PropertyDescriptor.Builder()
- .name("Bucket")
- .description(BUCKET_DESC)
- .required(true)
+ .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
.defaultValue("${" + BUCKET_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new
PropertyDescriptor.Builder()
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 3f0ba5386b..b3d021ac57 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -221,14 +221,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.required(true)
.build();
- public static final PropertyDescriptor BUCKET = new
PropertyDescriptor.Builder()
- .name("Bucket")
- .description(BUCKET_DESC)
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
- .build();
-
public static final PropertyDescriptor PREFIX = new
PropertyDescriptor.Builder()
.name("Prefix")
.description("The prefix used to filter the object list. In most
cases, it should end with a forward slash ('/').")
@@ -409,11 +401,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return Collections.singletonList("storage.objects.list");
}
- @Override
- protected String getBucketName(final ProcessContext context, final
Map<String, String> attributes) {
- return
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
- }
-
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, verificationLogger, attributes));
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 7c59e43dd4..e54b79e2bc 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -148,12 +148,9 @@ import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
})
public class PutGCSObject extends AbstractGCSProcessor {
public static final PropertyDescriptor BUCKET = new
PropertyDescriptor.Builder()
- .name("Bucket")
- .description(BUCKET_DESC)
- .required(true)
+ .fromPropertyDescriptor(AbstractGCSProcessor.BUCKET)
.defaultValue("${" + BUCKET_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new
PropertyDescriptor.Builder()
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
index 03810ad661..276aafcd3b 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/AbstractGoogleDriveIT.java
@@ -27,6 +27,7 @@ import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
+import
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
import
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
@@ -106,6 +107,7 @@ public abstract class AbstractGoogleDriveIT<T extends
GoogleDriveTrait & Process
GCPCredentialsControllerService gcpCredentialsControllerService = new
GCPCredentialsControllerService();
testRunner.addControllerService("gcp_credentials_provider_service",
gcpCredentialsControllerService);
+ testRunner.setProperty(gcpCredentialsControllerService,
CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY,
AuthenticationStrategy.SERVICE_ACCOUNT_JSON_FILE);
testRunner.setProperty(gcpCredentialsControllerService,
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE,
CREDENTIAL_JSON_FILE_PATH);
testRunner.enableControllerService(gcpCredentialsControllerService);
testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcp_credentials_provider_service");
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
index 933e6789bd..a19b7ea47b 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubIT.java
@@ -1,47 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.TestRunner;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class AbstractGCPubSubIT {
-
- protected static final String PROJECT_ID = "my-gcm-client";
- protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
- protected static TestRunner runner;
-
- protected TestRunner setCredentialsCS(TestRunner runner) throws
InitializationException {
- final String serviceAccountJsonFilePath = "path/to/credentials/json";
- final Map<String, String> propertiesMap = new HashMap<>();
- final GCPCredentialsControllerService credentialsControllerService =
new GCPCredentialsControllerService();
-
- propertiesMap.put("application-default-credentials", "false");
- propertiesMap.put("compute-engine-credentials", "false");
- propertiesMap.put("service-account-json-file",
serviceAccountJsonFilePath);
-
- runner.addControllerService(CONTROLLER_SERVICE,
credentialsControllerService, propertiesMap);
- runner.enableControllerService(credentialsControllerService);
- runner.assertValid(credentialsControllerService);
-
- return runner;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
+import
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AbstractGCPubSubIT {
+
+ protected static final String PROJECT_ID = "my-gcm-client";
+ protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+ protected static TestRunner runner;
+
+ protected TestRunner setCredentialsCS(TestRunner runner) throws
InitializationException {
+ final String serviceAccountJsonFilePath = "path/to/credentials/json";
+ final Map<String, String> propertiesMap = new HashMap<>();
+ final GCPCredentialsControllerService credentialsControllerService =
new GCPCredentialsControllerService();
+
+
propertiesMap.put(CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY.getName(),
AuthenticationStrategy.SERVICE_ACCOUNT_JSON_FILE.getValue());
+
propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(),
serviceAccountJsonFilePath);
+
+ runner.addControllerService(CONTROLLER_SERVICE,
credentialsControllerService, propertiesMap);
+ runner.enableControllerService(credentialsControllerService);
+ runner.assertValid(credentialsControllerService);
+
+ return runner;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
index c50b365429..34bbf97918 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSIT.java
@@ -24,6 +24,8 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import org.apache.nifi.processor.Processor;
+import
org.apache.nifi.processors.gcp.credentials.factory.AuthenticationStrategy;
+import
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -94,6 +96,7 @@ public abstract class AbstractGCSIT {
final GCPCredentialsControllerService credentialsControllerService =
new GCPCredentialsControllerService();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcpCredentialsControllerService",
credentialsControllerService);
+ runner.setProperty(credentialsControllerService,
CredentialPropertyDescriptors.AUTHENTICATION_STRATEGY,
AuthenticationStrategy.APPLICATION_DEFAULT);
runner.enableControllerService(credentialsControllerService);
runner.setProperty(AbstractGCSProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE,
"gcpCredentialsControllerService");
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
index 18e817284b..c6a812ad59 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
@@ -87,9 +87,9 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", KEY);
final List<ConfigVerificationResult> results =
processor.verify(runner.getProcessContext(), runner.getLogger(), attributes);
- assertEquals(2, results.size());
+ assertEquals(3, results.size());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
results.get(1).getOutcome());
- assertTrue(results.get(1).getExplanation().matches("Successfully
fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling
3 bytes"));
+ assertTrue(results.get(2).getExplanation().matches("Successfully
fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling
3 bytes"));
runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS, 1);
final List<MockFlowFile> ffs =
runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS);