This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1208e03b4c NIFI-8497 Added SlackRecordSink
1208e03b4c is described below

commit 1208e03b4cee85e0e4c9fadf807549985eb3a2b3
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Mon Oct 24 18:19:29 2022 -0700

    NIFI-8497 Added SlackRecordSink
    
    This closes #6593
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi-slack-bundle/nifi-slack-nar/pom.xml       |   6 +
 .../nifi-slack-processors/pom.xml                  |  50 +++++
 .../nifi/services/slack/PostMessageResponse.java   |  67 ++++++
 .../nifi/services/slack/SlackRecordSink.java       | 157 ++++++++++++++
 .../nifi/services/slack/SlackRestService.java      | 121 +++++++++++
 .../services/slack/SlackRestServiceException.java  |  27 +++
 .../org.apache.nifi.controller.ControllerService   |  15 ++
 .../nifi/services/slack/TestSlackRecordSink.java   | 228 +++++++++++++++++++++
 8 files changed, 671 insertions(+)

diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
index 46f5d27717..9fea5bafd1 100644
--- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-nar/pom.xml
@@ -35,5 +35,11 @@
             <artifactId>nifi-slack-processors</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 </project>
diff --git a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
index e6b25f4a6b..6d957e1fbb 100644
--- a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
@@ -85,5 +85,55 @@
             <artifactId>nifi-ssl-context-service-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-provider-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-web-client-provider-service</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-proxy-configuration-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
new file mode 100644
index 0000000000..6ebea1cffa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/PostMessageResponse.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import java.time.Instant;
+
+public class PostMessageResponse {
+    private boolean ok;
+    private String channel;
+    private Instant ts;
+    private String error;
+    private String warning;
+
+    public boolean isOk() {
+        return ok;
+    }
+
+    public void setOk(boolean ok) {
+        this.ok = ok;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public void setChannel(String channel) {
+        this.channel = channel;
+    }
+
+    public Instant getTs() {
+        return ts;
+    }
+
+    public void setTs(Instant ts) {
+        this.ts = ts;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+    }
+
+    public String getWarning() {
+        return warning;
+    }
+
+    public void setWarning(String warning) {
+        this.warning = warning;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
new file mode 100644
index 0000000000..f833122cec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRecordSink.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"slack", "record", "sink"})
+@CapabilityDescription("Format and send Records to a configured Channel using 
the Slack Post Message API. " +
+        "The service requires a Slack App with a Bot User configured for 
access to a Slack workspace. " +
+        "The Bot User OAuth Bearer Token is required for posting messages to 
Slack.")
+public class SlackRecordSink extends AbstractControllerService implements 
RecordSinkService {
+
+    private static final String SLACK_API_URL = "https://slack.com/api";;
+
+    public static final PropertyDescriptor API_URL = new 
PropertyDescriptor.Builder()
+            .name("api-url")
+            .displayName("API URL")
+            .description("Slack Web API URL for posting text messages to 
channels." +
+                    " It only needs to be changed if Slack changes its API 
URL.")
+            .required(true)
+            .defaultValue(SLACK_API_URL)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ACCESS_TOKEN = new 
PropertyDescriptor.Builder()
+            .name("access-token")
+            .displayName("Access Token")
+            .description("Bot OAuth Token used for authenticating and 
authorizing the Slack request sent by NiFi.")
+            .required(true)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CHANNEL_ID = new 
PropertyDescriptor.Builder()
+            .name("channel-id")
+            .displayName("Channel ID")
+            .description("Slack channel, private group, or IM channel to send 
the message to. Use Channel ID instead of the name.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INPUT_CHARACTER_SET = new 
PropertyDescriptor.Builder()
+            .name("input-character-set")
+            .displayName("Input Character Set")
+            .description("Specifies the character set of the records used to 
generate the Slack message.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(StandardCharsets.UTF_8.name())
+            .build();
+
+    public static final PropertyDescriptor WEB_SERVICE_CLIENT_PROVIDER = new 
PropertyDescriptor.Builder()
+            .name("web-service-client-provider")
+            .displayName("Web Service Client Provider")
+            .description("Controller service to provide HTTP client for 
communicating with Slack API")
+            .required(true)
+            .identifiesControllerService(WebClientServiceProvider.class)
+            .build();
+
+    private volatile RecordSetWriterFactory writerFactory;
+    private SlackRestService service;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                API_URL,
+                ACCESS_TOKEN,
+                CHANNEL_ID,
+                INPUT_CHARACTER_SET,
+                RECORD_WRITER_FACTORY,
+                WEB_SERVICE_CLIENT_PROVIDER
+        ));
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        writerFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final WebClientServiceProvider webClientServiceProvider = context
+                .getProperty(WEB_SERVICE_CLIENT_PROVIDER)
+                .asControllerService(WebClientServiceProvider.class);
+        final String accessToken = 
context.getProperty(ACCESS_TOKEN).getValue();
+        final String apiUrl = context.getProperty(API_URL).getValue();
+        final String charset = 
context.getProperty(INPUT_CHARACTER_SET).getValue();
+        service = new SlackRestService(webClientServiceProvider, accessToken, 
apiUrl, charset, getLogger());
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, final Map<String, 
String> attributes, final boolean sendZeroResults) throws IOException {
+        WriteResult writeResult;
+        final String channel = 
getConfigurationContext().getProperty(CHANNEL_ID).getValue();
+        int recordCount = 0;
+        try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), recordSet.getSchema(), out, 
attributes)) {
+                writer.beginRecordSet();
+                Record record = recordSet.next();
+                while (record != null) {
+                    writer.write(record);
+                    writer.flush();
+                    record = recordSet.next();
+                    recordCount++;
+                }
+                writeResult = writer.finishRecordSet();
+                writer.flush();
+            } catch (final SchemaNotFoundException e) {
+                final String errorMessage = String.format("RecordSetWriter 
could not be created because the schema was not found. The schema name for the 
RecordSet to write is %s",
+                        recordSet.getSchema().getSchemaName());
+                throw new ProcessException(errorMessage, e);
+            }
+            if (recordCount > 0 || sendZeroResults) {
+                try {
+                    final String message = out.toString();
+                    service.sendMessageToChannel(message, channel);
+                } catch (final SlackRestServiceException e) {
+                    throw new IOException("Failed to send messages to Slack", 
e);
+                }
+            }
+        }
+        return writeResult;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
new file mode 100644
index 0000000000..3a23eb3e7b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestService.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.OptionalLong;
+
+public class SlackRestService {
+    private static final String POST_MESSAGE_PATH = "chat.postMessage";
+    private final WebClientServiceProvider webClientServiceProvider;
+    private final String accessToken;
+    private final String apiUrl;
+    private final ObjectMapper objectMapper;
+    private final ComponentLog logger;
+    private final String charset;
+
+    public SlackRestService(final WebClientServiceProvider 
webClientServiceProvider,
+                            final String accessToken,
+                            final String apiUrl,
+                            final String charset,
+                            final ComponentLog logger) {
+        this.webClientServiceProvider = webClientServiceProvider;
+        this.accessToken = accessToken;
+        this.apiUrl = apiUrl;
+        this.objectMapper = new ObjectMapper();
+        objectMapper.registerModule(new JavaTimeModule());
+        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        this.charset = charset;
+        this.logger = logger;
+    }
+
+    public void sendMessageToChannel(final String message, final String 
channel) throws SlackRestServiceException {
+        final URI apiUri = URI.create(apiUrl);
+        final HttpUriBuilder uriBuilder = 
webClientServiceProvider.getHttpUriBuilder()
+                .scheme(apiUri.getScheme())
+                .host(apiUri.getHost())
+                .encodedPath(apiUri.getPath())
+                .addPathSegment(POST_MESSAGE_PATH);
+        if (apiUri.getPort() != -1) {
+            uriBuilder.port(apiUri.getPort());
+        }
+        final URI uri = uriBuilder.build();
+
+        final ObjectNode requestBodyJson = createRequestBody(channel, message);
+
+        final InputStream requestBodyInputStream;
+        try {
+            requestBodyInputStream = new 
ByteArrayInputStream(objectMapper.writeValueAsBytes(requestBodyJson));
+        } catch (final JsonProcessingException e) {
+            throw new SlackRestServiceException("JSON message serialization 
failed", e);
+        }
+
+        try (final HttpResponseEntity response = 
webClientServiceProvider.getWebClientService()
+                .post()
+                .uri(uri)
+                .header("Authorization", String.format("Bearer %s", 
accessToken))
+                .header("Content-Type", String.format("application/json; 
charset=\"%s\"", charset))
+                .body(requestBodyInputStream, 
OptionalLong.of(requestBodyInputStream.available()))
+                .retrieve()) {
+            final int statusCode = response.statusCode();
+            if (!(statusCode >= 200 && statusCode < 300)) {
+                throw new SlackRestServiceException("HTTP error code: " + 
statusCode);
+            }
+
+            try {
+                final PostMessageResponse slackResponse = 
objectMapper.readValue(response.body(), PostMessageResponse.class);
+                checkResponse(slackResponse, channel);
+            } catch (final IOException e) {
+                throw new SlackRestServiceException("JSON response parsing 
failed", e);
+            }
+
+        } catch (final IOException e) {
+            throw new ProcessException("Slack HTTP request failed", e);
+        }
+    }
+
+    private ObjectNode createRequestBody(final String channel, final String 
message) {
+        final ObjectNode requestBodyJson = objectMapper.createObjectNode();
+        requestBodyJson.put("channel", channel);
+        requestBodyJson.put("text", message);
+        return requestBodyJson;
+    }
+
+    private void checkResponse(final PostMessageResponse response, final 
String channel) throws SlackRestServiceException {
+        if (!response.isOk()) {
+            throw new SlackRestServiceException("Slack error response: " + 
response.getError());
+        }
+
+        if (response.getWarning() != null) {
+            logger.warn("Post message to channel [{}] warning: {}", channel, 
response.getWarning());
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
new file mode 100644
index 0000000000..b2f09baf62
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/java/org/apache/nifi/services/slack/SlackRestServiceException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+class SlackRestServiceException extends Exception {
+    SlackRestServiceException(final String message) {
+        super(message);
+    }
+
+    SlackRestServiceException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..5069eb222b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.slack.SlackRecordSink
diff --git 
a/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
new file mode 100644
index 0000000000..8ab2896d7b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/src/test/java/org/apache/nifi/services/slack/TestSlackRecordSink.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.slack;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
+import 
org.apache.nifi.web.client.provider.service.StandardWebClientServiceProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSlackRecordSink {
+    private static final String RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP = 
"{\"ok\": true, \"ts\": \"1503435956.000247\"}";
+    private static final String RESPONSE_WARNING = "{\"ok\": true, 
\"warning\": \"slack-warning\"}";
+    private static final String RESPONSE_ERROR = "{\"ok\": false, \"error\": 
\"slack-error\"}";
+    private static final String RESPONSE_EMPTY_JSON = "{}";
+    private static final String RESPONSE_INVALID_JSON = "{invalid-json}";
+
+    private static final String CHANNEL_NAME = "my-channel";
+    private static final String BEARER_TOKEN = "bearer-token";
+
+    private TestRunner testRunner;
+    private MockWebServer mockWebServer;
+    private SlackRecordSink slackRecordSink;
+    private MockRecordWriter writerFactory;
+    private RecordSet recordSet;
+    private String recordContentsAsString;
+    private ObjectMapper mapper;
+
+    @BeforeEach
+    public void setup() throws InitializationException, IOException {
+        mapper = new ObjectMapper();
+
+        mockWebServer = new MockWebServer();
+        mockWebServer.start();
+        String url = mockWebServer.url("/api/").toString();
+
+        testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
+
+        final WebClientServiceProvider webClientServiceProvider = new 
StandardWebClientServiceProvider();
+        testRunner.addControllerService("webClientServiceProvider", 
webClientServiceProvider);
+        testRunner.enableControllerService(webClientServiceProvider);
+
+        slackRecordSink = new SlackRecordSink();
+
+        testRunner.addControllerService("slackRecordSink", slackRecordSink);
+        testRunner.setProperty(slackRecordSink, SlackRecordSink.API_URL, url);
+        testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, 
BEARER_TOKEN);
+        testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, 
CHANNEL_NAME);
+        testRunner.setProperty(slackRecordSink, 
SlackRecordSink.WEB_SERVICE_CLIENT_PROVIDER, "webClientServiceProvider");
+
+        writerFactory = new MockRecordWriter();
+        testRunner.addControllerService("writer", writerFactory);
+        testRunner.setProperty(slackRecordSink, 
SlackRecordSink.RECORD_WRITER_FACTORY, "writer");
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("a", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("b", 
RecordFieldType.BOOLEAN.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> valueMap1 = new HashMap<>();
+        valueMap1.put("a", "Hello");
+        valueMap1.put("b", true);
+        final Record record1 = new MapRecord(schema, valueMap1);
+
+        final Map<String, Object> valueMap2 = new HashMap<>();
+        valueMap2.put("a", "World");
+        valueMap2.put("b", false);
+        final Record record2 = new MapRecord(schema, valueMap2);
+
+        recordContentsAsString = "\"Hello\",\"true\"\n\"World\",\"false\"\n";
+        recordSet = RecordSet.of(schema, record1, record2);
+    }
+
+    @AfterEach
+    public void cleanUp() throws IOException {
+        mockWebServer.shutdown();
+    }
+
+    @Test
+    public void testSendMessage() throws IOException {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new 
MockResponse().setResponseCode(200).setBody(RESPONSE_SUCCESS_TEXT_MSG_WITH_TIMESTAMP));
+
+        final WriteResult writeResult = slackRecordSink.sendData(recordSet, 
Collections.emptyMap(), false);
+
+        assertNotNull(writeResult);
+        assertEquals(2, writeResult.getRecordCount());
+        assertEquals(Collections.EMPTY_MAP, writeResult.getAttributes());
+        final JsonNode requestBodyJson = getRequestBodyJson();
+        assertEquals(CHANNEL_NAME, requestBodyJson.get("channel").asText());
+        assertEquals(recordContentsAsString, 
requestBodyJson.get("text").asText());
+    }
+
+    @Test
+    public void testNotValidIfChannelEmpty() {
+        testRunner.setProperty(slackRecordSink, SlackRecordSink.CHANNEL_ID, 
(String) null);
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(slackRecordSink);
+    }
+
+    @Test
+    public void testNotValidIfBearerTokenEmpty() {
+        testRunner.setProperty(slackRecordSink, SlackRecordSink.ACCESS_TOKEN, 
(String) null);
+
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertNotValid(slackRecordSink);
+    }
+
+    @Test
+    public void testFailureWhenHttpErrorCodeReturned() {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new MockResponse().setResponseCode(500));
+
+        final IOException e = assertThrows(IOException.class, () -> 
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+        assertTrue(e.getCause().getMessage().contains("500"));
+    }
+
+    @Test
+    public void testFailureWhenSlackReturnsError() {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new 
MockResponse().setResponseCode(200).setBody(RESPONSE_ERROR));
+
+        final IOException e = assertThrows(IOException.class, () -> 
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+        assertTrue(e.getCause().getMessage().contains("slack-error"));
+    }
+
+    @Test
+    public void testNoFailureWhenSlackReturnsWarning() {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new 
MockResponse().setResponseCode(200).setBody(RESPONSE_WARNING));
+
+        assertDoesNotThrow(() -> {
+            slackRecordSink.sendData(recordSet, Collections.emptyMap(), false);
+        });
+    }
+
+    @Test
+    public void testFailureWhenSlackReturnsEmptyJson() {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new 
MockResponse().setResponseCode(200).setBody(RESPONSE_EMPTY_JSON));
+
+        final IOException e = assertThrows(IOException.class, () -> 
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+        assertTrue(e.getCause().getMessage().contains("null"));
+    }
+
+    @Test
+    public void testFailureWhenSlackReturnsInvalidJson() {
+        testRunner.enableControllerService(writerFactory);
+        testRunner.assertValid(slackRecordSink);
+        testRunner.enableControllerService(slackRecordSink);
+
+        mockWebServer.enqueue(new 
MockResponse().setResponseCode(200).setBody(RESPONSE_INVALID_JSON));
+
+        final IOException e = assertThrows(IOException.class, () -> 
slackRecordSink.sendData(recordSet, Collections.emptyMap(), false));
+        assertTrue(e.getCause().getMessage().contains("parsing"));
+    }
+
+    private JsonNode getRequestBodyJson() {
+        try {
+            final RecordedRequest recordedRequest = 
mockWebServer.takeRequest();
+            return mapper.readTree(recordedRequest.getBody().inputStream());
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

Reply via email to