joewitt commented on code in PR #9010: URL: https://github.com/apache/nifi/pull/9010#discussion_r1659382744
########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() Review Comment: What is the use case for which a user would manually configure the processor to look at a specific channel id? ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") Review Comment: what is the use case for which a user would look at a specific message timestamp in the processor configuration itself? ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() Review Comment: RELEASE_PER_REACTION is probably a clearer naming ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") Review Comment: Possibly clearer description "If true the flowfile will be released each time a reaction is found for the specified message" ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() Review Comment: A clearer name might be 'WAIT_PERIOD" And the description would be "The period of time a processor will continue to search for reactions for the specified message" And this property should only be shown conditionally and based on "RELEASE_PER_REACTION" being false. ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") Review Comment: Messages are routed to this relationship when reactions are found and either the wait period has elapsed or releasing per reaction is configured. ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") Review Comment: Does this failure mean the user should self-loop and potentiallly try again or should they assume it means the message isn't found and will never be found? Probably good to advise the user of what failure means ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") Review Comment: "Further reactions may occur and the waiting period has not elapsed. Consider self-looping" ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() Review Comment: A clearer name might be 'WAIT_PERIOD" And the description would be "The period of time a processor will continue to search for reactions for the specified message" ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() Review Comment: A clearer name might be 'WAIT_PERIOD" And the description would be "The period of time a processor will continue to search for reactions for the specified message unless releasing per reaction" ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CHANNEL_ID, + THREAD_TIMESTAMP, + ACCESS_TOKEN, + WAIT_MONITOR_WINDOW, + RELEASE_IF_ONE_REACTION); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, + REL_WAIT, + REL_NO_REACTION, + REL_FAILURE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private RateLimit rateLimit; + private volatile App slackApp; + private MethodsClient client; + + @OnScheduled + public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); + slackApp = createSlackApp(context); + client = initializeClient(slackApp); + } + + @OnStopped + public void shutdown() { + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + public RateLimit getRateLimit() { + return rateLimit; + } + + private App createSlackApp(final ProcessContext context) { + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final AppConfig appConfig = AppConfig.builder() + .singleTeamBotToken(botToken) + .build(); + + return new App(appConfig); + } + + protected MethodsClient initializeClient(final App slackApp) { + slackApp.start(); + return slackApp.client(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (rateLimit.isLimitReached()) { + getLogger().debug("Will not fetch reactions from Slack because rate limit has been reached"); + context.yield(); + return; + } + + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String channelId = context.getProperty(CHANNEL_ID).evaluateAttributeExpressions(flowFile).getValue(); Review Comment: I dont think the user supplied processor property value is a valid use case. Also "Channel ID" doesn't seem valid as the flowfile attribute to look up as nothing preceding this component would set that. ConsumeSlack for instance sets "slack.channel.id" ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CHANNEL_ID, + THREAD_TIMESTAMP, + ACCESS_TOKEN, + WAIT_MONITOR_WINDOW, + RELEASE_IF_ONE_REACTION); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, + REL_WAIT, + REL_NO_REACTION, + REL_FAILURE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private RateLimit rateLimit; + private volatile App slackApp; + private MethodsClient client; + + @OnScheduled + public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); + slackApp = createSlackApp(context); + client = initializeClient(slackApp); + } + + @OnStopped + public void shutdown() { + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + public RateLimit getRateLimit() { + return rateLimit; + } + + private App createSlackApp(final ProcessContext context) { + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final AppConfig appConfig = AppConfig.builder() + .singleTeamBotToken(botToken) + .build(); + + return new App(appConfig); + } + + protected MethodsClient initializeClient(final App slackApp) { + slackApp.start(); + return slackApp.client(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (rateLimit.isLimitReached()) { + getLogger().debug("Will not fetch reactions from Slack because rate limit has been reached"); + context.yield(); + return; + } + + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String channelId = context.getProperty(CHANNEL_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String threadTimestamp = context.getProperty(THREAD_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue(); Review Comment: same comment as for channel id ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") Review Comment: "in the given waiting period." ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CHANNEL_ID, + THREAD_TIMESTAMP, + ACCESS_TOKEN, + WAIT_MONITOR_WINDOW, + RELEASE_IF_ONE_REACTION); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, + REL_WAIT, + REL_NO_REACTION, + REL_FAILURE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private RateLimit rateLimit; + private volatile App slackApp; + private MethodsClient client; + + @OnScheduled + public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); + slackApp = createSlackApp(context); + client = initializeClient(slackApp); + } + + @OnStopped + public void shutdown() { + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + public RateLimit getRateLimit() { + return rateLimit; + } + + private App createSlackApp(final ProcessContext context) { + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final AppConfig appConfig = AppConfig.builder() + .singleTeamBotToken(botToken) + .build(); + + return new App(appConfig); + } + + protected MethodsClient initializeClient(final App slackApp) { + slackApp.start(); + return slackApp.client(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (rateLimit.isLimitReached()) { Review Comment: If i understand correctly this is a general rate limit indicator. Does it also makes sense to limit how aggressively we'd check on the reactions for a single message? If there are only a few messages and they're in a tight loop here it would hit the Slack API constantly if I read this right. Perhaps you want to protect that by ensuring no message gets checked for reactions faster than once per every N seconds. ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CHANNEL_ID, + THREAD_TIMESTAMP, + ACCESS_TOKEN, + WAIT_MONITOR_WINDOW, + RELEASE_IF_ONE_REACTION); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, + REL_WAIT, + REL_NO_REACTION, + REL_FAILURE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private RateLimit rateLimit; + private volatile App slackApp; + private MethodsClient client; + + @OnScheduled + public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); + slackApp = createSlackApp(context); + client = initializeClient(slackApp); + } + + @OnStopped + public void shutdown() { + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + public RateLimit getRateLimit() { + return rateLimit; + } + + private App createSlackApp(final ProcessContext context) { + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final AppConfig appConfig = AppConfig.builder() + .singleTeamBotToken(botToken) + .build(); + + return new App(appConfig); + } + + protected MethodsClient initializeClient(final App slackApp) { + slackApp.start(); + return slackApp.client(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (rateLimit.isLimitReached()) { + getLogger().debug("Will not fetch reactions from Slack because rate limit has been reached"); + context.yield(); + return; + } + + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String channelId = context.getProperty(CHANNEL_ID).evaluateAttributeExpressions(flowFile).getValue(); Review Comment: I dont think the user supplied processor property value is a valid use case. ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/processors/slack/GetSlackReaction.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.bolt.AppConfig; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.SlackApiException; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.slack.util.RateLimit; +import org.apache.nifi.processors.slack.util.SlackResponseUtil; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = GetSlackReaction.ATTR_CHANNEL_ID, description = "The ID of the Slack Channel from which the messages were retrieved"), + @WritesAttribute(attribute = GetSlackReaction.ATTR_MESSAGE_TIMESTAMP, description = "The timestamp of the Slack message the reactions are fetched for."), + @WritesAttribute(attribute = "slack.reaction.<emoji name>", description = "The name of the emoji and the reaction count that was provided for the message."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_WAIT_TIME, description = "The total number of minutes waited while the reactions were captured."), + @WritesAttribute(attribute = GetSlackReaction.ATTR_ERROR_MESSAGE, description = "The error message on fetching reactions.") +}) +@SeeAlso({ListenSlack.class, ConsumeSlack.class, PublishSlack.class}) +@Tags({"slack", "conversation", "reactions.get", "social media", "emoji"}) +@CapabilityDescription("Retrieves reactions for a given message. The reactions are written as attributes.") +@DefaultSettings(penaltyDuration = "5 min") +public class GetSlackReaction extends AbstractProcessor { + public static final String ATTR_WAIT_TIME = "minutes.waited"; + public static final String ATTR_ERROR_MESSAGE = "error.message"; + public static final String ATTR_MESSAGE_TIMESTAMP = "slack.message.timestamp"; + public static final String ATTR_CHANNEL_ID = "slack.channel.id"; + public static final int PENALTY_MINUTES = 5; + + static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. " + + "It must be granted the reactions:read scope and channels:history, groups:history, im:history or mpim:history, depending on the type of conversation being used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + static final PropertyDescriptor CHANNEL_ID = new PropertyDescriptor.Builder() + .name("Channel ID") + .description("The ID of the channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor THREAD_TIMESTAMP = new PropertyDescriptor.Builder() + .name("Message Timestamp") + .description("Slack message's timestamp the reactions are fetched for.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RELEASE_IF_ONE_REACTION = new PropertyDescriptor.Builder() + .name("Release If One Reaction Arrived") + .description("It is possible to wait for the first reaction or wait for the specified amount of time to fetch all possible reactions.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + static final PropertyDescriptor WAIT_MONITOR_WINDOW = new PropertyDescriptor.Builder() + .name("Wait Monitor Window") + .description("Processor will periodically poll reactions for the given time duration if no reaction arrived yet or " + + RELEASE_IF_ONE_REACTION.getDisplayName() + " is set to false.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles are routed to this relationship if fetching reactions were successful") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles are routed to this relationship if an error occurred when fetching reactions") + .build(); + + static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("Self loop relationship which is used when waiting for further reactions") + .build(); + + static final Relationship REL_NO_REACTION = new Relationship.Builder() + .name("no_reaction") + .description("FlowFiles are routed to this relationship if no reaction arrived for the message in the given timeframe") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CHANNEL_ID, + THREAD_TIMESTAMP, + ACCESS_TOKEN, + WAIT_MONITOR_WINDOW, + RELEASE_IF_ONE_REACTION); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, + REL_WAIT, + REL_NO_REACTION, + REL_FAILURE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + private RateLimit rateLimit; + private volatile App slackApp; + private MethodsClient client; + + @OnScheduled + public void setup(final ProcessContext context) { + rateLimit = new RateLimit(getLogger()); + slackApp = createSlackApp(context); + client = initializeClient(slackApp); + } + + @OnStopped + public void shutdown() { + if (slackApp != null) { + slackApp.stop(); + slackApp = null; + } + rateLimit = null; + } + + public RateLimit getRateLimit() { + return rateLimit; + } + + private App createSlackApp(final ProcessContext context) { + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final AppConfig appConfig = AppConfig.builder() + .singleTeamBotToken(botToken) + .build(); + + return new App(appConfig); + } + + protected MethodsClient initializeClient(final App slackApp) { + slackApp.start(); + return slackApp.client(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (rateLimit.isLimitReached()) { + getLogger().debug("Will not fetch reactions from Slack because rate limit has been reached"); + context.yield(); + return; + } + + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final String botToken = context.getProperty(ACCESS_TOKEN).getValue(); + final String channelId = context.getProperty(CHANNEL_ID).evaluateAttributeExpressions(flowFile).getValue(); Review Comment: I dont think the user supplied processor property value is a valid use case. ########## nifi-extension-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/processors/slack/TestGetSlackReaction.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.slack; + +import com.slack.api.bolt.App; +import com.slack.api.methods.MethodsClient; +import com.slack.api.methods.request.reactions.ReactionsGetRequest; +import com.slack.api.methods.response.reactions.ReactionsGetResponse; +import com.slack.api.model.Reaction; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) +public class TestGetSlackReaction { + public static final String TEST_MESSAGE_TS = "123456.789"; + public static final String TEST_CHANNEL_ID = "cid1"; + private TestRunner testRunner; + GetSlackReaction processor; + @Mock + private MethodsClient clientMock; + + @BeforeEach + public void setup() { + processor = new GetSlackReaction() { + @Override + protected MethodsClient initializeClient(final App slackApp) { + return clientMock; + } + }; + + testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(GetSlackReaction.ACCESS_TOKEN, "token"); + testRunner.setProperty(GetSlackReaction.CHANNEL_ID, TEST_CHANNEL_ID); Review Comment: This test is helpful to better understand what the proposed use case is. Here it shows the user supplying a specific processor property value for the channel and timestamp of the message to gather reactions for. This is not a valid use case for NiFi. The valid use case would be for these attributes to come from the incoming flow file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
