[ 
https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484228#comment-16484228
 ] 

ASF GitHub Bot commented on NIFI-5133:
--------------------------------------

Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189965330
  
    --- Diff: 
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.core.ApiFuture;
    +import com.google.api.gax.batching.BatchingSettings;
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.api.gax.core.InstantiatingExecutorProvider;
    +import com.google.cloud.pubsub.v1.Publisher;
    +import com.google.common.collect.ImmutableList;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.Timestamp;
    +import com.google.pubsub.v1.ProjectTopicName;
    +import com.google.pubsub.v1.PubsubMessage;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
    +
    +@SeeAlso({ConsumeGCPubSub.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google-cloud", "pubsub", "publish"})
    +@CapabilityDescription("Publishes the content of the incoming flowfile to 
the configured Google Cloud PubSub topic. The processor supports dynamic 
properties" +
    +        "to be added. If any such dynamic properties are present, they 
will be sent along with the message in the form of 'attributes'.")
    +@DynamicProperty(name = "Attribute name", value = "Value to be set to the 
attribute",
    +        description = "Attributes to be set for the outgoing Google Cloud 
PubSub message", expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = 
MESSAGE_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = 
TOPIC_NAME_DESCRIPTION)
    +})
    +public class PublishGCPubSub extends AbstractGCPubSubProcessor{
    +
    +    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-topic")
    +            .displayName("Topic Name")
    +            .description("Name of the Google Cloud PubSub Topic")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor EXECUTOR_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-executor-count")
    +            .displayName("Executor Count")
    +            .description("Indicates the number of executors the cloud 
service should use to publish the messages")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles are routed to this relationship if the 
Google Cloud Pub/Sub operation fails but attempting the operation again may 
succeed.")
    +            .build();
    +
    +    private Publisher publisher = null;
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                TOPIC_NAME,
    +                EXECUTOR_COUNT,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .displayName(propertyDescriptorName)
    +                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .dynamic(true)
    +                
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +                .build();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, 
REL_RETRY))
    +        );
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        try {
    +            publisher = getPublisherBuilder(context, flowFile).build();
    +
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFile, baos);
    +            final ByteString flowFileContent = 
ByteString.copyFromUtf8(baos.toString());
    +
    +            PubsubMessage message = 
PubsubMessage.newBuilder().setData(flowFileContent)
    +                    .setPublishTime(Timestamp.newBuilder().build())
    +                    .putAllAttributes(getDynamicAttributesMap(context, 
flowFile))
    +                    .build();
    +
    +            ApiFuture<String> messageIdFuture = publisher.publish(message);
    +
    +            while (messageIdFuture.isDone()) {
    +                Thread.sleep(1000L);
    +            }
    +
    +            final String messageId = messageIdFuture.get();
    +
    +            attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
    +            attributes.put(TOPIC_NAME_ATTRIBUTE, getTopicName(context, 
flowFile).toString());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +        } catch (IOException e) {
    +            getLogger().error("Routing to 'failure'. Failed to build the 
Google Cloud PubSub Publisher due to ", e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        } catch (InterruptedException | ExecutionException e) {
    +            getLogger().error("Routing to 'retry'. Failed to publish the 
message to Google PubSub topic due to ", e);
    +            session.transfer(flowFile, REL_RETRY);
    +            return;
    +        } finally {
    +            shutdownPublisher();
    --- End diff --
    
    Actually I previously had ExpressionLanguageScope for `TOPIC`  set to 
`VariableRegistry` and in `unScheduled` shutdown the publisher since that would 
be more efficient but the GCP SDK kept on writing WARN and ERROR log messages 
saying "shutdown was not called and for proper resource utilization, call 
shutdown()" That's why I had to go with this. I'll try to check with Google 
folks and there is this `TransportChannel` API.. have to check if that offers 
anything of interest in this regard.


> Create a processor to Publish and Subscribe to Google Pub/Sub
> -------------------------------------------------------------
>
>                 Key: NIFI-5133
>                 URL: https://issues.apache.org/jira/browse/NIFI-5133
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: Abdelkrim Hadjidj
>            Assignee: Sivaprasanna Sethuraman
>            Priority: Major
>
> As a workflow designer, I would like to publish/subscribe messages to/from 
> Google Pub/Sub. This integration can enable several ingestion and realtime 
> use case where data is available on Google Cloud.
> There are few options that are outside the NiFi project:
> [https://github.com/ammitt90/nifi-pubsub-processor]
> [https://github.com/synack/nifi-gcp-pubsub-publisher]
> [https://github.com/synack/nifi-gcp-pubsub-consumer]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to