This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1208e03b4c NIFI-8497 Added SlackRecordSink
1208e03b4c is described below
commit 1208e03b4cee85e0e4c9fadf807549985eb3a2b3
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Mon Oct 24 18:19:29 2022 -0700
NIFI-8497 Added SlackRecordSink
This closes #6593
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-slack-bundle/nifi-slack-nar/pom.xml | 6 +
.../nifi-slack-processors/pom.xml | 50 +++++
.../nifi/services/slack/PostMessageResponse.java | 67 ++++++
.../nifi/services/slack/SlackRecordSink.java | 157 ++++++++++++++
.../nifi/services/slack/SlackRestService.java | 121 +++++++++++
.../services/slack/SlackRestServiceException.java | 27 +++
.../org.apache.nifi.controller.ControllerService | 15 ++
.../nifi/services/slack/TestSlackRecordSink.java | 228 +++++++++++++++++++++
8 files changed, 671 insertions(+)
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
index 46f5d27717..9fea5bafd1 100644
--- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
@@ -35,5 +35,11 @@
<artifactId>nifi-slack-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
index e6b25f4a6b..6d957e1fbb 100644
--- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
@@ -85,5 +85,55 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-sink-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-api</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-provider-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-client-provider-service</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-proxy-configuration-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
new file mode 100644
index 0000000000..6ebea1cffa
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import java.time.Instant;
+
+public class PostMessageResponse {
+ private boolean ok;
+ private String channel;
+ private Instant ts;
+ private String error;
+ private String warning;
+
+ public boolean isOk() {
+ return ok;
+ }
+
+ public void setOk(boolean ok) {
+ this.ok = ok;
+ }
+
+ public String getChannel() {
+ return channel;
+ }
+
+ public void setChannel(String channel) {
+ this.channel = channel;
+ }
+
+ public Instant getTs() {
+ return ts;
+ }
+
+ public void setTs(Instant ts) {
+ this.ts = ts;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ public String getWarning() {
+ return warning;
+ }
+
+ public void setWarning(String warning) {
+ this.warning = warning;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
new file mode 100644
index 0000000000..f833122cec
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records to a configured Channel using
the Slack Post Message API. " +
+ "The service requires a Slack App with a Bot User configured for
access to a Slack workspace. " +
+ "The Bot User OAuth Bearer Token is required for posting messages to
Slack.")
+public class SlackRecordSink extends AbstractControllerService implements
RecordSinkService {
+
+ private static final String SLACK_API_URL = "https://slack.com/api";
+
+ public static final PropertyDescriptor API_URL = new
PropertyDescriptor.Builder()
+ .name("api-url")
+ .displayName("API URL")
+ .description("Slack Web API URL for posting text messages to
channels." +
+ " It only needs to be changed if Slack changes its API
URL.")
+ .required(true)
+ .defaultValue(SLACK_API_URL)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ACCESS_TOKEN = new
PropertyDescriptor.Builder()
+ .name("access-token")
+ .displayName("Access Token")
+ .description("Bot OAuth Token used for authenticating and
authorizing the Slack request sent by NiFi.")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CHANNEL_ID = new
PropertyDescriptor.Builder()
+ .name("channel-id")
+ .displayName("Channel ID")
+ .description("Slack channel, private group, or IM channel to send
the message to. Use Channel ID instead of the name.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INPUT_CHARACTER_SET = new
PropertyDescriptor.Builder()
+ .name("input-character-set")
+ .displayName("Input Character Set")
+ .description("Specifies the character set of the records used to
generate the Slack message.")
+ .required(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue(StandardCharsets.UTF_8.name())
+ .build();
+
+ public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new
PropertyDescriptor.Builder()
+ .name("web-service-client-provider")
+ .displayName("Web Service Client Provider")
+ .description("Controller service to provide HTTP client for
communicating with Slack API")
+ .required(true)
+ .identifiesControllerService(WebClientServiceProvider.class)
+ .build();
+
+ private volatile RecordSetWriterFactory writerFactory;
+ private SlackRestService service;
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(Arrays.asList(
+ API_URL,
+ ACCESS_TOKEN,
+ CHANNEL_ID,
+ INPUT_CHARACTER_SET,
+ RECORD_WRITER_FACTORY,
+ WEB_SERVICE_CLIENT_PROVIDER
+ ));
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ writerFactory =
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ final WebClientServiceProvider webClientServiceProvider = context
+ .getProperty(WEB_SERVICE_CLIENT_PROVIDER)
+ .asControllerService(WebClientServiceProvider.class);
+ final String accessToken =
context.getProperty(ACCESS_TOKEN).getValue();
+ final String apiUrl = context.getProperty(API_URL).getValue();
+ final String charset =
context.getProperty(INPUT_CHARACTER_SET).getValue();
+ service = new SlackRestService(webClientServiceProvider, accessToken,
apiUrl, charset, getLogger());
+ }
+
+ @Override
+ public WriteResult sendData(final RecordSet recordSet, final Map<String,
String> attributes, final boolean sendZeroResults) throws IOException {
+ WriteResult writeResult;
+ final String channel =
getConfigurationContext().getProperty(CHANNEL_ID).getValue();
+ int recordCount = 0;
+ try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ try (final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), recordSet.getSchema(), out,
attributes)) {
+ writer.beginRecordSet();
+ Record record = recordSet.next();
+ while (record != null) {
+ writer.write(record);
+ writer.flush();
+ record = recordSet.next();
+ recordCount++;
+ }
+ writeResult = writer.finishRecordSet();
+ writer.flush();
+ } catch (final SchemaNotFoundException e) {
+ final String errorMessage = String.format("RecordSetWriter
could not be created because the schema was not found. The schema name for the
RecordSet to write is %s",
+ recordSet.getSchema().getSchemaName());
+ throw new ProcessException(errorMessage, e);
+ }
+ if (recordCount > 0 || sendZeroResults) {
+ try {
+ final String message = out.toString();
+ service.sendMessageToChannel(message, channel);
+ } catch (final SlackRestServiceException e) {
+ throw new IOException("Failed to send messages to Slack",
e);
+ }
+ }
+ }
+ return writeResult;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
new file mode 100644
index 0000000000..3a23eb3e7b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+ private static final String POST_MESSAGE_PATH = "chat.postMessage";
+ private final WebClientServiceProvider webClientServiceProvider;
+ private final String accessToken;
+ private final String apiUrl;
+ private final ObjectMapper objectMapper;
+ private final ComponentLog logger;
+ private final String charset;
+
+ public SlackRestService(final WebClientServiceProvider
webClientServiceProvider,
+ final String accessToken,
+ final String apiUrl,
+ final String charset,
+ final ComponentLog logger) {
+ this.webClientServiceProvider = webClientServiceProvider;
+ this.accessToken = accessToken;
+ this.apiUrl = apiUrl;
+ this.objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JavaTimeModule());
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ this.charset = charset;
+ this.logger = logger;
+ }
+
+ public void sendMessageToChannel(final String message, final String
channel) throws SlackRestServiceException {
+ final URI apiUri = URI.create(apiUrl);
+ final HttpUriBuilder uriBuilder =
webClientServiceProvider.getHttpUriBuilder()
+ .scheme(apiUri.getScheme())
+ .host(apiUri.getHost())
+ .encodedPath(apiUri.getPath())
+ .addPathSegment(POST_MESSAGE_PATH);
+ if (apiUri.getPort() != -1) {
+ uriBuilder.port(apiUri.getPort());
+ }
+ final URI uri = uriBuilder.build();
+
+ final ObjectNode requestBodyJson = createRequestBody(channel, message);
+
+ final InputStream requestBodyInputStream;
+ try {
+ requestBodyInputStream = new
ByteArrayInputStream(objectMapper.writeValueAsBytes(requestBodyJson));
+ } catch (final JsonProcessingException e) {
+ throw new SlackRestServiceException("JSON message serialization
failed", e);
+ }
+
+ try (final HttpResponseEntity response =
webClientServiceProvider.getWebClientService()
+ .post()
+ .uri(uri)
+ .header("Authorization", String.format("Bearer %s",
accessToken))
+ .header("Content-Type", String.format("application/json;
charset=\"%s\"", charset))
+ .body(requestBodyInputStream,
OptionalLong.of(requestBodyInputStream.available()))
+ .retrieve()) {
+ final int statusCode = response.statusCode();
+ if (!(statusCode >= 200 && statusCode < 300)) {
+ throw new SlackRestServiceException("HTTP error code: " +
statusCode);
+ }
+
+ try {
+ final PostMessageResponse slackResponse =
objectMapper.readValue(response.body(), PostMessageResponse.class);
+ checkResponse(slackResponse, channel);
+ } catch (final IOException e) {
+ throw new SlackRestServiceException("JSON response parsing
failed", e);
+ }
+
+ } catch (final IOException e) {
+ throw new ProcessException("Slack HTTP request failed", e);
+ }
+ }
+
+ private ObjectNode createRequestBody(final String channel, final String
message) {
+ final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+ requestBodyJson.put("channel", channel);
+ requestBodyJson.put("text", message);
+ return requestBodyJson;
+ }
+
+ private void checkResponse(final PostMessageResponse response, final
String channel) throws SlackRestServiceException {
+ if (!response.isOk()) {
+ throw new SlackRestServiceException("Slack error response: " +
response.getError());
+ }
+
+ if (response.getWarning() != null) {
+ logger.warn("Post message to channel [{}] warning: {}", channel,
response.getWarning());
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
new file mode 100644
index 0000000000..b2f09baf62
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+class SlackRestServiceException extends Exception {
+ SlackRestServiceException(final String message) {
+ super(message);
+ }
+
+ SlackRestServiceException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..5069eb222b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.slack.SlackRecordSink
diff --git
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
new file mode 100644
index 0000000000..8ab2896d7b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSlackRecordSink {
+ private static final String RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP =
"{\"ok\": true, \"ts\": \"1503435956.000247\"}";
+ private static final String RESPONSE_WARNING = "{\"ok\": true,
\"warning\": \"slack-warning\"}";
+ private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\":
\"slack-error\"}";
+ private static final String RESPONSE_EMPTY_JSON = "{}";
+ private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
+
+ private static final String CHANNEL_NAME = "my-channel";
+ private static final String BEARER_TOKEN = "bearer-token";
+
+ private TestRunner testRunner;
+ private MockWebServer mockWebServer;
+ private SlackRecordSink slackRecordSink;
+ private MockRecordWriter writerFactory;
+ private RecordSet recordSet;
+ private String recordContentsAsString;
+ private ObjectMapper mapper;
+
+ @BeforeEach
+ public void setup() throws InitializationException, IOException {
+ mapper = new ObjectMapper();
+
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ String url = mockWebServer.url("/api/").toString();
+
+ testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+ final WebClientServiceProvider webClientServiceProvider = new
StandardWebClientServiceProvider();
+ testRunner.addControllerService("webClientServiceProvider",
webClientServiceProvider);
+ testRunner.enableControllerService(webClientServiceProvider);
+
+ slackRecordSink = new SlackRecordSink();
+
+ testRunner.addControllerService("slackRecordSink", slackRecordSink);
+ testRunner.setProperty(slackRecordSink, SlackRecordSink.API_URL, url);
+ testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN,
BEARER_TOKEN);
+ testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID,
CHANNEL_NAME);
+ testRunner.setProperty(slackRecordSink,
SlackRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider");
+
+ writerFactory = new MockRecordWriter();
+ testRunner.addControllerService("writer", writerFactory);
+ testRunner.setProperty(slackRecordSink,
SlackRecordSink.RECORD_WRITER_FACTORY, "writer");
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("a", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("b",
RecordFieldType.BOOLEAN.getDataType()));
+
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> valueMap1 = new HashMap<>();
+ valueMap1.put("a", "Hello");
+ valueMap1.put("b", true);
+ final Record record1 = new MapRecord(schema, valueMap1);
+
+ final Map<String, Object> valueMap2 = new HashMap<>();
+ valueMap2.put("a", "World");
+ valueMap2.put("b", false);
+ final Record record2 = new MapRecord(schema, valueMap2);
+
+ recordContentsAsString = "\"Hello\",\"true\"\n\"World\",\"false\"\n";
+ recordSet = RecordSet.of(schema, record1, record2);
+ }
+
+ @AfterEach
+ public void cleanUp() throws IOException {
+ mockWebServer.shutdown();
+ }
+
+ @Test
+ public void testSendMessage() throws IOException {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new
MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP));
+
+ final WriteResult writeResult = slackRecordSink.sendData(recordSet,
Collections.emptyMap(), false);
+
+ assertNotNull(writeResult);
+ assertEquals(2, writeResult.getRecordCount());
+ assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes());
+ final JsonNode requestBodyJson = getRequestBodyJson();
+ assertEquals(CHANNEL_NAME, requestBodyJson.get("channel").asText());
+ assertEquals(recordContentsAsString,
requestBodyJson.get("text").asText());
+ }
+
+ @Test
+ public void testNotValidIfChannelEmpty() {
+ testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID,
(String) null);
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(slackRecordSink);
+ }
+
+ @Test
+ public void testNotValidIfBearerTokenEmpty() {
+ testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN,
(String) null);
+
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertNotValid(slackRecordSink);
+ }
+
+ @Test
+ public void testFailureWhenHttpErrorCodeReturned() {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new MockResponse().setResponseCode(500));
+
+ final IOException e = assertThrows(IOException.class, () ->
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+ assertTrue(e.getCause().getMessage().contains("500"));
+ }
+
+ @Test
+ public void testFailureWhenSlackReturnsError() {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new
MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR));
+
+ final IOException e = assertThrows(IOException.class, () ->
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+ assertTrue(e.getCause().getMessage().contains("slack-error"));
+ }
+
+ @Test
+ public void testNoFailureWhenSlackReturnsWarning() {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new
MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING));
+
+ assertDoesNotThrow(() -> {
+ slackRecordSink.sendData(recordSet, Collections.emptyMap(), false);
+ });
+ }
+
+ @Test
+ public void testFailureWhenSlackReturnsEmptyJson() {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new
MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON));
+
+ final IOException e = assertThrows(IOException.class, () ->
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+ assertTrue(e.getCause().getMessage().contains("null"));
+ }
+
+ @Test
+ public void testFailureWhenSlackReturnsInvalidJson() {
+ testRunner.enableControllerService(writerFactory);
+ testRunner.assertValid(slackRecordSink);
+ testRunner.enableControllerService(slackRecordSink);
+
+ mockWebServer.enqueue(new
MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON));
+
+ final IOException e = assertThrows(IOException.class, () ->
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+ assertTrue(e.getCause().getMessage().contains("parsing"));
+ }
+
+ private JsonNode getRequestBodyJson() {
+ try {
+ final RecordedRequest recordedRequest =
mockWebServer.takeRequest();
+ return mapper.readTree(recordedRequest.getBody().inputStream());
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}