[ 
https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484020#comment-16484020
 ] 

ASF GitHub Bot commented on NIFI-5133:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189903461
  
    --- Diff: 
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud 
PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single 
request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = 
ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, 
description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, 
description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, 
description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, 
description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = 
(context.getProperty(BATCH_SIZE).isSet()) ? 
context.getProperty(BATCH_SIZE).asInteger() : 1;
    --- End diff --
    
    to be changed if my previous comment makes sense


> Create a processor to Publish and Subscribe to Google Pub/Sub
> -------------------------------------------------------------
>
>                 Key: NIFI-5133
>                 URL: https://issues.apache.org/jira/browse/NIFI-5133
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: Abdelkrim Hadjidj
>            Assignee: Sivaprasanna Sethuraman
>            Priority: Major
>
> As a workflow designer, I would like to publish/subscribe messages to/from 
> Google Pub/Sub. This integration can enable several ingestion and realtime 
> use case where data is available on Google Cloud.
> There are few options that are outside the NiFi project:
> [https://github.com/ammitt90/nifi-pubsub-processor]
> [https://github.com/synack/nifi-gcp-pubsub-publisher]
> [https://github.com/synack/nifi-gcp-pubsub-consumer]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to