[ 
https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484019#comment-16484019
 ] 

ASF GitHub Bot commented on NIFI-5133:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189905474
  
    --- Diff: 
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.core.ApiFuture;
    +import com.google.api.gax.batching.BatchingSettings;
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.api.gax.core.InstantiatingExecutorProvider;
    +import com.google.cloud.pubsub.v1.Publisher;
    +import com.google.common.collect.ImmutableList;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.Timestamp;
    +import com.google.pubsub.v1.ProjectTopicName;
    +import com.google.pubsub.v1.PubsubMessage;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
    +
    +@SeeAlso({ConsumeGCPubSub.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google-cloud", "pubsub", "publish"})
    +@CapabilityDescription("Publishes the content of the incoming flowfile to 
the configured Google Cloud PubSub topic. The processor supports dynamic 
properties" +
    +        "to be added. If any such dynamic properties are present, they 
will be sent along with the message in the form of 'attributes'.")
    +@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
    +        description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
    +})
    +public class PublishGCPubSub extends AbstractGCPubSubProcessor{
    +
    +    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-topic")
    +            .displayName("Topic Name")
    +            .description("Name of the Google Cloud PubSub Topic")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor EXECUTOR_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-executor-count")
    +            .displayName("Executor Count")
    +            .description("Indicates the number of executors the cloud 
service should use to publish the messages")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
    +            .build();
    +
    +    private Publisher publisher = null;
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                TOPIC_NAME,
    +                EXECUTOR_COUNT,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .displayName(propertyDescriptorName)
    +                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .dynamic(true)
    +                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +                .build();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, 
REL_RETRY))
    +        );
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        try {
    +            publisher = getPublisherBuilder(context, flowFile).build();
    +
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFile, baos);
    +            final ByteString flowFileContent = 
ByteString.copyFromUtf8(baos.toString());
    +
    +            PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
    +                    .setPublishTime(Timestamp.newBuilder().build())
    +                    .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
    +                    .build();
    +
    +            ApiFuture<String> messageIdFuture = publisher.publish(message);
    +
    +            while (messageIdFuture.isDone()) {
    +                Thread.sleep(1000L);
    +            }
    +
    +            final String messageId = messageIdFuture.get();
    +
    +            attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
    +            attributes.put(TOPIC_NAME_ATTRIBUTE, getTopicName(context, 
flowFile).toString());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +        } catch (IOException e) {
    +            getLogger().error("Routing to 'failure'. Failed to build the 
Google Cloud PubSub Publisher due to ", e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        } catch (InterruptedException | ExecutionException e) {
    +            getLogger().error("Routing to 'retry'. Failed to publish the 
message to Google PubSub topic due to ", e);
    +            session.transfer(flowFile, REL_RETRY);
    +            return;
    +        } finally {
    +            shutdownPublisher();
    --- End diff --
    
    What's the performance cost to create and shutdown the publisher for each 
flow file? I understand it's to use flow file attributes but wondering if it's 
worth the perf cost.


> Create a processor to Publish and Subscribe to Google Pub/Sub
> -------------------------------------------------------------
>
>                 Key: NIFI-5133
>                 URL: https://issues.apache.org/jira/browse/NIFI-5133
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: Abdelkrim Hadjidj
>            Assignee: Sivaprasanna Sethuraman
>            Priority: Major
>
> As a workflow designer, I would like to publish/subscribe messages to/from 
> Google Pub/Sub. This integration can enable several ingestion and realtime 
> use case where data is available on Google Cloud.
> There are few options that are outside the NiFi project:
> [https://github.com/ammitt90/nifi-pubsub-processor]
> [https://github.com/synack/nifi-gcp-pubsub-publisher]
> [https://github.com/synack/nifi-gcp-pubsub-consumer]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to