[
https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484019#comment-16484019
]
ASF GitHub Bot commented on NIFI-5133:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2724#discussion_r189905474
--- Diff:
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
+
+@SeeAlso({ConsumeGCPubSub.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "google-cloud", "pubsub", "publish"})
+@CapabilityDescription("Publishes the content of the incoming flowfile to
the configured Google Cloud PubSub topic. The processor supports dynamic
properties" +
+ "to be added. If any such dynamic properties are present, they
will be sent along with the message in the form of 'attributes'.")
+@DynamicProperty(name = "Attribute name", value = "Value to be set to the
attribute",
+ description = "Attributes to be set for the outgoing Google Cloud
PubSub message", expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@WritesAttributes({
+ @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description =
MESSAGE_ID_DESCRIPTION),
+ @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description =
TOPIC_NAME_DESCRIPTION)
+})
+public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+
+ public static final PropertyDescriptor TOPIC_NAME = new
PropertyDescriptor.Builder()
+ .name("gcp-pubsub-topic")
+ .displayName("Topic Name")
+ .description("Name of the Google Cloud PubSub Topic")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor EXECUTOR_COUNT = new
PropertyDescriptor.Builder()
+ .name("gcp-pubsub-publish-executor-count")
+ .displayName("Executor Count")
+ .description("Indicates the number of executors the cloud
service should use to publish the messages")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("FlowFiles are routed to this relationship if the
Google Cloud Pub/Sub operation fails but attempting the operation again may
succeed.")
+ .build();
+
+ private Publisher publisher = null;
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.of(PROJECT_ID,
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ TOPIC_NAME,
+ EXECUTOR_COUNT,
+ BATCH_SIZE);
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .required(false)
+ .name(propertyDescriptorName)
+ .displayName(propertyDescriptorName)
+
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .dynamic(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE,
REL_RETRY))
+ );
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+
+ if (flowFile == null) {
+ context.yield();
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ try {
+ publisher = getPublisherBuilder(context, flowFile).build();
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ session.exportTo(flowFile, baos);
+ final ByteString flowFileContent =
ByteString.copyFromUtf8(baos.toString());
+
+ PubsubMessage message =
PubsubMessage.newBuilder().setData(flowFileContent)
+ .setPublishTime(Timestamp.newBuilder().build())
+ .putAllAttributes(getDynamicAttributesMap(context,
flowFile))
+ .build();
+
+ ApiFuture<String> messageIdFuture = publisher.publish(message);
+
+ while (messageIdFuture.isDone()) {
+ Thread.sleep(1000L);
+ }
+
+ final String messageId = messageIdFuture.get();
+
+ attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
+ attributes.put(TOPIC_NAME_ATTRIBUTE, getTopicName(context,
flowFile).toString());
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ } catch (IOException e) {
+ getLogger().error("Routing to 'failure'. Failed to build the
Google Cloud PubSub Publisher due to ", e);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } catch (InterruptedException | ExecutionException e) {
+ getLogger().error("Routing to 'retry'. Failed to publish the
message to Google PubSub topic due to ", e);
+ session.transfer(flowFile, REL_RETRY);
+ return;
+ } finally {
+ shutdownPublisher();
--- End diff --
What's the performance cost to create and shutdown the publisher for each
flow file? I understand it's to use flow file attributes but wondering if it's
worth the perf cost.
> Create a processor to Publish and Subscribe to Google Pub/Sub
> -------------------------------------------------------------
>
> Key: NIFI-5133
> URL: https://issues.apache.org/jira/browse/NIFI-5133
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: Abdelkrim Hadjidj
> Assignee: Sivaprasanna Sethuraman
> Priority: Major
>
> As a workflow designer, I would like to publish/subscribe messages to/from
> Google Pub/Sub. This integration can enable several ingestion and realtime
> use case where data is available on Google Cloud.
> There are few options that are outside the NiFi project:
> [https://github.com/ammitt90/nifi-pubsub-processor]
> [https://github.com/synack/nifi-gcp-pubsub-publisher]
> [https://github.com/synack/nifi-gcp-pubsub-consumer]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)