This is an automated email from the ASF dual-hosted git repository.
pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new afb3a669c [ISSUE#4419] Add slack sink connector. (#4562)
afb3a669c is described below
commit afb3a669c2afcd5db2b7c49b95899f7bd478388e
Author: yanrongzhen <[email protected]>
AuthorDate: Mon Nov 20 08:33:57 2023 +0800
[ISSUE#4419] Add slack sink connector. (#4562)
* Add slack sink connector.
* fix code review
---
eventmesh-connectors/README.md | 96 ++++++++---------
.../eventmesh-connector-slack/build.gradle | 37 +++++++
.../slack/config/SlackConnectServerConfig.java | 30 ++++++
.../connector/slack/server/SlackConnectServer.java | 38 +++++++
.../slack/sink/config/SinkConnectorConfig.java | 30 ++++++
.../slack/sink/config/SlackSinkConfig.java | 30 ++++++
.../slack/sink/connector/SlackSinkConnector.java | 117 +++++++++++++++++++++
.../src/main/resources/server-config.yml | 18 ++++
.../src/main/resources/sink-config.yml | 30 ++++++
.../sink/connector/SlackSinkConnectorTest.java | 91 ++++++++++++++++
.../src/test/resources/server-config.yml | 18 ++++
.../src/test/resources/sink-config.yml | 30 ++++++
settings.gradle | 1 +
13 files changed, 519 insertions(+), 47 deletions(-)
diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index 1ea07e3e1..8b14734f5 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -18,50 +18,52 @@ Add a new connector by implementing the source/sink
interface using :
## Connector Status
-| Connector Name | Type |
Status |
-|:------------------------------------------------:|:-----------------:|:--------:|
-| [RocketMQ](eventmesh-connector-rocketmq) | Source | ✅
|
-| [RocketMQ](eventmesh-connector-rocketmq) | Sink | ✅
|
-| ChatGPT | Source | ⬜
|
-| ChatGPT | Sink | ⬜
|
-| ClickHouse | Source | ⬜
|
-| ClickHouse | Sink | ⬜
|
-| DingDing | Source | ⬜
|
-| [DingDing](eventmesh-connector-dingding) | Sink | ✅
|
-| Email | Source | ⬜
|
-| Email | Sink | ⬜
|
-| FeiShu | Source | ⬜
|
-| FeiShu | Sink | ⬜
|
-| [File](eventmesh-connector-file) | Source | ✅
|
-| [File](eventmesh-connector-file) | Sink | ✅
|
-| Github | Source | ⬜
|
-| Github | Sink | ⬜
|
-| Http | Source | ⬜
|
-| Http | Sink | ⬜
|
-| Jdbc | Source | ⬜
|
-| [Jdbc](eventmesh-connector-jdbc) | Sink | ✅
|
-| [Kafka](eventmesh-connector-kafka) | Source | ✅
|
-| [Kafka](eventmesh-connector-kafka) | Sink | ✅
|
-| [Knative](eventmesh-connector-knative) | Source | ✅
|
-| [Knative](eventmesh-connector-knative) | Sink | ✅
|
-| [MongoDB](eventmesh-connector-mongodb) | Source | ✅
|
-| [MongoDB](eventmesh-connector-mongodb) | Sink | ✅
|
-| [OpenFunction](eventmesh-connector-openfunction) | Source | ✅
|
-| [OpenFunction](eventmesh-connector-openfunction) | Sink | ✅
|
-| [Pravega](eventmesh-connector-pravega) | Source | ✅
|
-| [Pravega](eventmesh-connector-pravega) | Sink | ✅
|
-| [Promethues](eventmesh-connector-prometheus) | Source | ✅
|
-| [Promethues](eventmesh-connector-prometheus) | Sink | ⬜
|
-| [Pulsar](eventmesh-connector-pulsar) | Source | ✅
|
-| [Pulsar](eventmesh-connector-pulsar) | Sink | ✅
|
-| [Rabbitmq](eventmesh-connector-rabbitmq) | Source | ✅
|
-| [Rabbitmq](eventmesh-connector-rabbitmq) | Sink | ✅
|
-| [Redis](eventmesh-connector-redis) | Source | ✅
|
-| [Redis](eventmesh-connector-redis) | Sink | ✅
|
-| S3File | Source | ⬜
|
-| [S3File](eventmesh-connector-s3) | Sink | ✅
|
-| [Spring](eventmesh-connector-spring) | Source | ✅
|
-| [Spring](eventmesh-connector-spring) | Sink | ✅
|
-| WeCom | Source | ⬜
|
-| [WeCom](eventmesh-connector-wecom) | Sink | ✅
|
-| More connectors will be added... | Source/Sink | N/A
|
\ No newline at end of file
+| Connector Name | Type | Status |
+|:------------------------------------------------:|:-----------:|:-------:|
+| [RocketMQ](eventmesh-connector-rocketmq) | Source | ✅ |
+| [RocketMQ](eventmesh-connector-rocketmq) | Sink | ✅ |
+| ChatGPT | Source | ⬜ |
+| ChatGPT | Sink | ⬜ |
+| ClickHouse | Source | ⬜ |
+| ClickHouse | Sink | ⬜ |
+| DingDing | Source | ⬜ |
+| [DingDing](eventmesh-connector-dingding) | Sink | ✅ |
+| Email | Source | ⬜ |
+| Email | Sink | ⬜ |
+| FeiShu | Source | ⬜ |
+| FeiShu | Sink | ⬜ |
+| [File](eventmesh-connector-file) | Source | ✅ |
+| [File](eventmesh-connector-file) | Sink | ✅ |
+| Github | Source | ⬜ |
+| Github | Sink | ⬜ |
+| Http | Source | ⬜ |
+| Http | Sink | ⬜ |
+| Jdbc | Source | ⬜ |
+| [Jdbc](eventmesh-connector-jdbc) | Sink | ✅ |
+| [Kafka](eventmesh-connector-kafka) | Source | ✅ |
+| [Kafka](eventmesh-connector-kafka) | Sink | ✅ |
+| [Knative](eventmesh-connector-knative) | Source | ✅ |
+| [Knative](eventmesh-connector-knative) | Sink | ✅ |
+| [MongoDB](eventmesh-connector-mongodb) | Source | ✅ |
+| [MongoDB](eventmesh-connector-mongodb) | Sink | ✅ |
+| [OpenFunction](eventmesh-connector-openfunction) | Source | ✅ |
+| [OpenFunction](eventmesh-connector-openfunction) | Sink | ✅ |
+| [Pravega](eventmesh-connector-pravega) | Source | ✅ |
+| [Pravega](eventmesh-connector-pravega) | Sink | ✅ |
+| [Promethues](eventmesh-connector-prometheus) | Source | ✅ |
+| [Promethues](eventmesh-connector-prometheus) | Sink | ⬜ |
+| [Pulsar](eventmesh-connector-pulsar) | Source | ✅ |
+| [Pulsar](eventmesh-connector-pulsar) | Sink | ✅ |
+| [Rabbitmq](eventmesh-connector-rabbitmq) | Source | ✅ |
+| [Rabbitmq](eventmesh-connector-rabbitmq) | Sink | ✅ |
+| [Redis](eventmesh-connector-redis) | Source | ✅ |
+| [Redis](eventmesh-connector-redis) | Sink | ✅ |
+| S3File | Source | ⬜ |
+| [S3File](eventmesh-connector-s3) | Sink | ✅ |
+| [Slack](eventmesh-connector-slack) | Source | ⬜ |
+| [Slack](eventmesh-connector-slack) | Sink | ✅ |
+| [Spring](eventmesh-connector-spring) | Source | ✅ |
+| [Spring](eventmesh-connector-spring) | Sink | ✅ |
+| WeCom | Source | ⬜ |
+| [WeCom](eventmesh-connector-wecom) | Sink | ✅ |
+| More connectors will be added... | Source/Sink | N/A |
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-slack/build.gradle
b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
new file mode 100644
index 000000000..ad66b78b9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+ implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+ implementation.exclude group: 'log4j', module: 'log4j'
+ testImplementation.exclude group: 'org.apache.logging.log4j', module:
'log4j-to-slf4j'
+}
+
+dependencies {
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+
+ implementation "com.slack.api:bolt:1.1.+"
+ implementation 'com.google.guava:guava'
+
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-junit-jupiter"
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
new file mode 100644
index 000000000..515c5af6c
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class SlackConnectServerConfig extends Config {
+
+ private boolean sinkEnable;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
new file mode 100644
index 000000000..b59f0657a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.slack.config.SlackConnectServerConfig;
+import org.apache.eventmesh.connector.slack.sink.connector.SlackSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class SlackConnectServer {
+
+ public static void main(String[] args) throws Exception {
+
+ SlackConnectServerConfig slackConnectServerConfig =
ConfigUtil.parse(SlackConnectServerConfig.class,
+ Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+ if (slackConnectServerConfig.isSinkEnable()) {
+ Application application = new Application();
+ application.run(SlackSinkConnector.class);
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..41884a94a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String appToken;
+
+ private String channelId;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
new file mode 100644
index 000000000..016cd9ae9
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class SlackSinkConfig extends SinkConfig {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
new file mode 100644
index 000000000..a026f2aa2
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.connector;
+
+import org.apache.eventmesh.connector.slack.sink.config.SlackSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.slack.api.Slack;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Slack sink connector.
+ * Slack doc: <a href="https://api.slack.com/messaging/sending">...</a>
+ */
+@Slf4j
+public class SlackSinkConnector implements Sink {
+
+ private SlackSinkConfig sinkConfig;
+
+ private volatile boolean isRunning = false;
+
+ private MethodsClient client;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return SlackSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ // init config for dingding sink connector
+ this.sinkConfig = (SlackSinkConfig) config;
+ this.client = Slack.getInstance().methods();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ // init config for dingding source connector
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
+ this.sinkConfig = (SlackSinkConfig)
sinkConnectorContext.getSinkConfig();
+ this.client = Slack.getInstance().methods();
+ }
+
+ @Override
+ public void start() {
+ isRunning = true;
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() {
+ isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @SneakyThrows
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ for (ConnectRecord record : sinkRecords) {
+ publishMessage(record);
+ }
+ }
+
+ private void publishMessage(ConnectRecord record) {
+ try {
+ ChatPostMessageResponse response = client.chatPostMessage(r -> r
+ .token(sinkConfig.getSinkConnectorConfig().getAppToken())
+ .channel(sinkConfig.getSinkConnectorConfig().getChannelId())
+ .text(new String((byte[]) record.getData())));
+ if (Objects.nonNull(response) &&
StringUtils.isNotBlank(response.getError())) {
+ throw new IllegalAccessException(response.getError());
+ }
+ } catch (Exception e) {
+ log.error("Send message to slack error.", e);
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..062eaf349
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-SLACK
+ idc: FT
+ env: PRD
+ group: slackSink
+ appId: 5034
+ userName: slackSinkUser
+ passWord: slackPassWord
+sinkConnectorConfig:
+ connectorName: slackSink
+ appToken: slackAppToken
+ channelId: slackChannelId
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
new file mode 100644
index 000000000..3f0a32755
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.slack.sink.config.SlackSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.slack.api.RequestConfigurator;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+
+@ExtendWith(MockitoExtension.class)
+public class SlackSinkConnectorTest {
+
+ private SlackSinkConnector sinkConnector;
+
+ @Mock
+ private MethodsClient client;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ sinkConnector = new SlackSinkConnector();
+ SlackSinkConfig sinkConfig = (SlackSinkConfig)
ConfigUtil.parse(sinkConnector.configClass());
+ sinkConnector.init(sinkConfig);
+ doReturn(new ChatPostMessageResponse())
+ .when(client).chatPostMessage(any(RequestConfigurator.class));
+ Field clientField =
ReflectionSupport.findFields(sinkConnector.getClass(),
+ (f) -> f.getName().equals("client"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ clientField.setAccessible(true);
+ clientField.set(sinkConnector, client);
+ sinkConnector.start();
+ }
+
+ @Test
+ public void testSendMessageToSlack() throws Exception {
+ final int times = 3;
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(), "Hello,
EventMesh!".getBytes(StandardCharsets.UTF_8));
+ records.add(connectRecord);
+ }
+ sinkConnector.put(records);
+ verify(client,
times(times)).chatPostMessage(any(RequestConfigurator.class));
+ }
+
+ @AfterEach
+ public void tearDown() {
+ sinkConnector.stop();
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..062eaf349
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-SLACK
+ idc: FT
+ env: PRD
+ group: slackSink
+ appId: 5034
+ userName: slackSinkUser
+ passWord: slackPassWord
+sinkConnectorConfig:
+ connectorName: slackSink
+ appToken: slackAppToken
+ channelId: slackChannelId
diff --git a/settings.gradle b/settings.gradle
index 85877303e..9555fc5db 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -47,6 +47,7 @@ include 'eventmesh-connectors:eventmesh-connector-prometheus'
include 'eventmesh-connectors:eventmesh-connector-dingding'
include 'eventmesh-connectors:eventmesh-connector-feishu'
include 'eventmesh-connectors:eventmesh-connector-wecom'
+include 'eventmesh-connectors:eventmesh-connector-slack'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]