mattyb149 commented on code in PR #9086:
URL: https://github.com/apache/nifi/pull/9086#discussion_r1686880548


##########
nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java:
##########
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.slack;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.slack.api.bolt.App;
+import com.slack.api.bolt.AppConfig;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.SlackApiException;
+import com.slack.api.methods.request.reactions.ReactionsGetRequest;
+import com.slack.api.methods.response.reactions.ReactionsGetResponse;
+import com.slack.api.model.Reaction;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSettings;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.slack.util.RateLimit;
+import org.apache.nifi.processors.slack.util.SlackResponseUtil;
+
+import static com.jayway.jsonpath.Option.ALWAYS_RETURN_LIST;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.FILENAME;
+import static org.apache.nifi.flowfile.attributes.CoreAttributes.MIME_TYPE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttributes({
+  @ReadsAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = 
"If set, the ID of the channel where the message was sent is taken from this 
attribute."),
+  @ReadsAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, 
description = "If set, the message timestamp is taken from this attribute.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "slack.reaction.<emoji name>", 
description = "The name of the reaction (emoji) and the reaction count that was 
provided for the message."),
+        @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, 
description = "The error message on fetching reactions."),
+        @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, 
description = "The ID of the Slack Channel where the message the reactions are 
fetched from."),
+        @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, 
description = "The timestamp of the Slack message the reactions are fetched 
for."),
+        @WritesAttribute(attribute = 
GetSlackReaction.ATTR_REACTION_POLLING_SECONDS, description = "The time in 
seconds the processor waited for reactions.")
+})
+@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class})
+@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"})
+@CapabilityDescription("Retrieves reactions for (a) given message(s). The 
reactions are written as Flow File attributes." +
+        "ConsumeSlack, ListenSlack or PublishSlack processor should be used 
before GetSlackReaction.")
+@DefaultSettings(penaltyDuration = "5 min")
+@UseCase(
+        description = "Fetch the reactions for a specific message that was 
earlier published by PublishSlack to a channel",
+        configuration = """
+            Set "Access Token" to the value of your Slack OAuth Access Token.
+            Set 'Message Identifier Strategy' to 'Attributes' (channel ID and 
message timestamp will be taken from attributes set by PublishSlack)
+            Set "Release Per Reaction" to 'true' if you wait for the first 
reaction(s) only, in case want to collect all reactions in a given waiting 
period,
+            set this property to 'false'.
+            Set "Wait Period" to the time period the processor will continue 
to search for reactions for the specified message (unless releasing per 
reaction)
+            """
+)
+@UseCase(
+        description = "Fetch the reactions for messages fetched by 
ConsumeSlack/ListenSlack",
+        configuration = """
+            Set "Access Token" to the value of your Slack OAuth Access Token.
+            Set 'Message Identifier Strategy' to 'JSON Path' (channel ID and 
message timestamp will be taken from the JSON output of 
ConsumeSlack/ListenSlack)
+            You can leave "Channel ID JSON Path", "Message Timestamp JSON 
Path" and "Message Text JSON Path" on default values.
+            Set "Release Per Reaction" to 'true' if you wait for the first 
reaction(s) only, in case want to collect all reactions in a given waiting 
period,
+            set this property to 'false'.
+            Set "Wait Period" to the time period the processor will continue 
to search for reactions for the specified message (unless releasing per 
reaction)
+            """
+)
+public class GetSlackReaction extends AbstractProcessor {
+    public static final String ATTR_MESSAGE_TIMESTAMP = "slack.ts";
+    public static final String ATTR_CHANNEL_ID = "slack.channel.id";
+    public static final String ATTR_ERROR_MESSAGE = "error.message";
+    public static final String ATTR_FIRST_REACTION_CHECK_TIMESTAMP = 
"first.reaction.check.ts";
+    public static final String ATTR_LAST_REACTION_CHECK_TIMESTAMP = 
"last.reaction.check.ts";
+    public static final String ATTR_REACTION_POLLING_SECONDS = 
"reaction.polling.seconds";
+    public static final String ATTR_SLACK_MESSAGE_COUNT = 
"slack.message.count";
+    public static final int MIN_REACTION_POLLING_FREQUENCY_SECONDS = 10;
+
+
+    static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("Access Token")
+            .description("OAuth Access Token used for 
authenticating/authorizing the Slack request sent by NiFi. This may be either a 
User Token or a Bot Token. " +
+                    "It must be granted the reactions:read scope.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_IDENTIFIER_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Message Identifier Strategy")
+            .description("Specifies the strategy to obtain the message 
timestamp and channel id from incoming Flow File. "
+             + "In case of '" + 
MessageIdentifierStrategy.ATTRIBUTES.getValue() + "' the 'slack.channel.id' and 
'slack.ts' Flow File attributes will be used "
+             + "and in case of '" + 
MessageIdentifierStrategy.JSON_PATH.getValue() + "' additional JSON path values 
needs to be specified to fetch channel id "
+             + "and message timestamp from incoming JSON content.")
+            .required(true)
+            .allowableValues(MessageIdentifierStrategy.class)
+            .defaultValue(MessageIdentifierStrategy.ATTRIBUTES.getValue())
+            .build();
+
+    static final PropertyDescriptor CHANNEL_ID_JSON_PATH = new 
PropertyDescriptor.Builder()
+            .name("Channel ID JSON Path")
+            .description("The JSON Path which identifies the channel ID in the 
incoming JSON.")

Review Comment:
   Should we require input to be in JSON format? It might be more flexible (and 
facilitate a future ConsumeSlackRecord processor for example) to use Record 
Path and a Record Reader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to