mattyb149 commented on code in PR #9086: URL: https://github.com/apache/nifi/pull/9086#discussion_r1686880548
########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import static com.jayway.jsonpath.Option.ALWAYS_RETURN_LIST; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.nifi.flowfile.attributes.CoreAttributes.FILENAME; +import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttributes({ + @ReadsAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "If set, the ID of the channel where the message was sent is taken from this attribute."), + @ReadsAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "If set, the message timestamp is taken from this attribute.") +}) +@WritesAttributes({ + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the reaction (emoji) and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The ID of the Slack Channel where the message the reactions are fetched from."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_REACTION_POLLING_SECONDS, description = "The time in seconds the processor waited for reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for (a) given message(s). The reactions are written as Flow File attributes." + + "ConsumeSlack, ListenSlack or PublishSlack processor should be used before GetSlackReaction.") +@DefaultSettings(penaltyDuration = "5 min") +@UseCase( + description = "Fetch the reactions for a specific message that was earlier published by PublishSlack to a channel", + configuration = """ + Set "Access Token" to the value of your Slack OAuth Access Token. + Set 'Message Identifier Strategy' to 'Attributes' (channel ID and message timestamp will be taken from attributes set by PublishSlack) + Set "Release Per Reaction" to 'true' if you wait for the first reaction(s) only, in case want to collect all reactions in a given waiting period, + set this property to 'false'. + Set "Wait Period" to the time period the processor will continue to search for reactions for the specified message (unless releasing per reaction) + """ +) +@UseCase( + description = "Fetch the reactions for messages fetched by ConsumeSlack/ListenSlack", + configuration = """ + Set "Access Token" to the value of your Slack OAuth Access Token. + Set 'Message Identifier Strategy' to 'JSON Path' (channel ID and message timestamp will be taken from the JSON output of ConsumeSlack/ListenSlack) + You can leave "Channel ID JSON Path", "Message Timestamp JSON Path" and "Message Text JSON Path" on default values. + Set "Release Per Reaction" to 'true' if you wait for the first reaction(s) only, in case want to collect all reactions in a given waiting period, + set this property to 'false'. + Set "Wait Period" to the time period the processor will continue to search for reactions for the specified message (unless releasing per reaction) + """ +) +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.ts"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_FIRST_REACTION_CHECK_TIMESTAMP = "first.reaction.check.ts"; + public static final String ATTR_LAST_REACTION_CHECK_TIMESTAMP = "last.reaction.check.ts"; + public static final String ATTR_REACTION_POLLING_SECONDS = "reaction.polling.seconds"; + public static final String ATTR_SLACK_MESSAGE_COUNT = "slack.message.count"; + public static final int MIN_REACTION_POLLING_FREQUENCY_SECONDS = 10; + + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor MESSAGE_IDENTIFIER_STRATEGY = new PropertyDescriptor.Builder() + .name("Message Identifier Strategy") + .description("Specifies the strategy to obtain the message timestamp and channel id from incoming Flow File. " + + "In case of '" + MessageIdentifierStrategy.ATTRIBUTES.getValue() + "' the 'slack.channel.id' and 'slack.ts' Flow File attributes will be used " + + "and in case of '" + MessageIdentifierStrategy.JSON_PATH.getValue() + "' additional JSON path values needs to be specified to fetch channel id " + + "and message timestamp from incoming JSON content.") + .required(true) + .allowableValues(MessageIdentifierStrategy.class) + .defaultValue(MessageIdentifierStrategy.ATTRIBUTES.getValue()) + .build(); + + static final PropertyDescriptor CHANNEL_ID_JSON_PATH = new PropertyDescriptor.Builder() + .name("Channel ID JSON Path") + .description("The JSON Path which identifies the channel ID in the incoming JSON.") Review Comment: Should we require input to be in JSON format? It might be more flexible (and facilitate a future ConsumeSlackRecord processor for example) to use Record Path and a Record Reader? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
