[ https://issues.apache.org/jira/browse/NIFI-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484020#comment-16484020 ]
ASF GitHub Bot commented on NIFI-5133: -------------------------------------- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2724#discussion_r189903461 --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.pubsub; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.common.collect.ImmutableList; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE; +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION; + +@SeeAlso({PublishGCPubSub.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"google", "google-cloud", "pubsub", "consume"}) +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " + + "the configured number of messages will be pulled in a single request, else only one message will be pulled.") +@WritesAttributes({ + @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION), + @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION), + @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION), + @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION), + @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION) +}) +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor { + + public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() + .name("gcp-pubsub-subscription") + .displayName("Subscription") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .description("Subscription name of the Google Cloud Pub/Sub") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + private SubscriberStub subscriber = null; + private PullRequest pullRequest; + + @OnScheduled + public void onScheduled(ProcessContext context) { + final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1; --- End diff -- to be changed if my previous comment makes sense > Create a processor to Publish and Subscribe to Google Pub/Sub > ------------------------------------------------------------- > > Key: NIFI-5133 > URL: https://issues.apache.org/jira/browse/NIFI-5133 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Affects Versions: 1.6.0 > Reporter: Abdelkrim Hadjidj > Assignee: Sivaprasanna Sethuraman > Priority: Major > > As a workflow designer, I would like to publish/subscribe messages to/from > Google Pub/Sub. This integration can enable several ingestion and realtime > use case where data is available on Google Cloud. > There are few options that are outside the NiFi project: > [https://github.com/ammitt90/nifi-pubsub-processor] > [https://github.com/synack/nifi-gcp-pubsub-publisher] > [https://github.com/synack/nifi-gcp-pubsub-consumer] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)