ijokarumawak commented on a change in pull request #3344: NIFI-6092 Create 
ListenSlack and FetchSlack processors
URL: https://github.com/apache/nifi/pull/3344#discussion_r269842088
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-slack-bundle/nifi-slack-services-rtm/src/main/java/org/apache/nifi/processors/slack/controllers/RTMSlackConnectionService.java
 ##########
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.slack.controllers;
+
+import com.github.seratch.jslack.Slack;
+import com.github.seratch.jslack.api.rtm.RTMClient;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import javax.websocket.DeploymentException;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.util.StandardValidators;
+@CapabilityDescription("Implementation of SlackConnectionService." +
+  "This service connects to the Real Time Messaging API of slack.")
+public class RTMSlackConnectionService extends AbstractControllerService 
implements SlackConnectionService {
+
+  private static final PropertyDescriptor API_TOKEN = new 
PropertyDescriptor.Builder()
+    .name("api-token")
+    .displayName("API token")
+    .description("Slack auth token required for Real Time Messaging API")
+    .sensitive(true)
+    .required(true)
+    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    .build();
+
+  private ConcurrentHashMap<String, Consumer<String>> map = new 
ConcurrentHashMap<>();
+
+  private RTMClient rtmClient;
+
+  @OnEnabled
+  public void startClient(ConfigurationContext context) {
+    try {
+      rtmClient = getRtmClient(context);
+      rtmClient.addMessageHandler(this::sendMessage);
+      rtmClient.addCloseHandler(closeReason -> getLogger().info("Slack RTM 
Client closed: " + closeReason.toString()));
+      rtmClient.addErrorHandler(throwable -> getLogger().error("Slack RTM 
Client error:", throwable));
+      rtmClient.connect();
 
 Review comment:
   I haven't tested but I assume, in a clustered environment, even if processor 
using this controller runs on a PrimaryNode only, this method is called and RTM 
client runs on every node. The received messages are discarded at `sendMessage` 
method. That increases number of API calls and affects negatively against api 
rate limits. The more nodes, the quicker reaching to the limit.
   https://api.slack.com/docs/rate-limits
   
   I think we should start RTM client lazily, for example, when the 
`registerProcessor` called first time.
   
   Also, we may need to call start/stopClient when the primary node state 
changes. It can be done at a method annotated with `@OnPrimaryNodeStateChange`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to