This is an automated email from the ASF dual-hosted git repository.

pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new afb3a669c [ISSUE#4419] Add slack sink connector. (#4562)
afb3a669c is described below

commit afb3a669c2afcd5db2b7c49b95899f7bd478388e
Author: yanrongzhen <[email protected]>
AuthorDate: Mon Nov 20 08:33:57 2023 +0800

    [ISSUE#4419] Add slack sink connector. (#4562)
    
    * Add slack sink connector.
    
    * fix code review
---
 eventmesh-connectors/README.md                     |  96 ++++++++---------
 .../eventmesh-connector-slack/build.gradle         |  37 +++++++
 .../slack/config/SlackConnectServerConfig.java     |  30 ++++++
 .../connector/slack/server/SlackConnectServer.java |  38 +++++++
 .../slack/sink/config/SinkConnectorConfig.java     |  30 ++++++
 .../slack/sink/config/SlackSinkConfig.java         |  30 ++++++
 .../slack/sink/connector/SlackSinkConnector.java   | 117 +++++++++++++++++++++
 .../src/main/resources/server-config.yml           |  18 ++++
 .../src/main/resources/sink-config.yml             |  30 ++++++
 .../sink/connector/SlackSinkConnectorTest.java     |  91 ++++++++++++++++
 .../src/test/resources/server-config.yml           |  18 ++++
 .../src/test/resources/sink-config.yml             |  30 ++++++
 settings.gradle                                    |   1 +
 13 files changed, 519 insertions(+), 47 deletions(-)

diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index 1ea07e3e1..8b14734f5 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -18,50 +18,52 @@ Add a new connector by implementing the source/sink 
interface using :
 ## Connector Status
 
 
-|                  Connector Name                  |       Type        |  
Status  |
-|:------------------------------------------------:|:-----------------:|:--------:|
-|     [RocketMQ](eventmesh-connector-rocketmq)     |      Source       |    ✅  
   |
-|     [RocketMQ](eventmesh-connector-rocketmq)     |       Sink        |    ✅  
   |
-|                     ChatGPT                      |      Source       |    ⬜  
   |
-|                     ChatGPT                      |       Sink        |    ⬜  
   |
-|                    ClickHouse                    |      Source       |    ⬜  
   |
-|                    ClickHouse                    |       Sink        |    ⬜  
   |
-|                     DingDing                     |      Source       |    ⬜  
   |
-|     [DingDing](eventmesh-connector-dingding)     |       Sink        |    ✅  
   |
-|                      Email                       |      Source       |    ⬜  
   |
-|                      Email                       |       Sink        |    ⬜  
   |
-|                      FeiShu                      |      Source       |    ⬜  
   |
-|                      FeiShu                      |       Sink        |    ⬜  
   |
-|         [File](eventmesh-connector-file)         |      Source       |    ✅  
   |
-|         [File](eventmesh-connector-file)         |       Sink        |    ✅  
   |
-|                      Github                      |      Source       |    ⬜  
   |
-|                      Github                      |       Sink        |    ⬜  
   |
-|                       Http                       |      Source       |    ⬜  
   |
-|                       Http                       |       Sink        |    ⬜  
   |
-|                       Jdbc                       |      Source       |    ⬜  
   |
-|         [Jdbc](eventmesh-connector-jdbc)         |       Sink        |    ✅  
   |
-|        [Kafka](eventmesh-connector-kafka)        |      Source       |    ✅  
   |
-|        [Kafka](eventmesh-connector-kafka)        |       Sink        |    ✅  
   |
-|      [Knative](eventmesh-connector-knative)      |      Source       |    ✅  
   |
-|      [Knative](eventmesh-connector-knative)      |       Sink        |    ✅  
   |
-|      [MongoDB](eventmesh-connector-mongodb)      |      Source       |    ✅  
   |
-|      [MongoDB](eventmesh-connector-mongodb)      |       Sink        |    ✅  
   |
-| [OpenFunction](eventmesh-connector-openfunction) |      Source       |    ✅  
   |
-| [OpenFunction](eventmesh-connector-openfunction) |       Sink        |    ✅  
   |
-|      [Pravega](eventmesh-connector-pravega)      |      Source       |    ✅  
   |
-|      [Pravega](eventmesh-connector-pravega)      |       Sink        |    ✅  
   |
-|   [Promethues](eventmesh-connector-prometheus)   |      Source       |    ✅  
   |
-|   [Promethues](eventmesh-connector-prometheus)   |       Sink        |    ⬜  
   |
-|       [Pulsar](eventmesh-connector-pulsar)       |      Source       |    ✅  
   |
-|       [Pulsar](eventmesh-connector-pulsar)       |       Sink        |    ✅  
   |
-|     [Rabbitmq](eventmesh-connector-rabbitmq)     |      Source       |    ✅  
   |
-|     [Rabbitmq](eventmesh-connector-rabbitmq)     |       Sink        |    ✅  
   |
-|        [Redis](eventmesh-connector-redis)        |      Source       |    ✅  
   |
-|        [Redis](eventmesh-connector-redis)        |       Sink        |    ✅  
   |
-|                      S3File                      |      Source       |    ⬜  
   |
-|         [S3File](eventmesh-connector-s3)         |       Sink        |    ✅  
   |
-|       [Spring](eventmesh-connector-spring)       |      Source       |    ✅  
   |
-|       [Spring](eventmesh-connector-spring)       |       Sink        |    ✅  
   |
-|                      WeCom                       |      Source       |    ⬜  
   |
-|        [WeCom](eventmesh-connector-wecom)        |       Sink        |    ✅  
   |
-|         More connectors will be added...         |    Source/Sink    |   N/A 
   |
\ No newline at end of file
+|                  Connector Name                  |    Type     | Status  |
+|:------------------------------------------------:|:-----------:|:-------:|
+|     [RocketMQ](eventmesh-connector-rocketmq)     |   Source    |    ✅    |
+|     [RocketMQ](eventmesh-connector-rocketmq)     |    Sink     |    ✅    |
+|                     ChatGPT                      |   Source    |    ⬜    |
+|                     ChatGPT                      |    Sink     |    ⬜    |
+|                    ClickHouse                    |   Source    |    ⬜    |
+|                    ClickHouse                    |    Sink     |    ⬜    |
+|                     DingDing                     |   Source    |    ⬜    |
+|     [DingDing](eventmesh-connector-dingding)     |    Sink     |    ✅    |
+|                      Email                       |   Source    |    ⬜    |
+|                      Email                       |    Sink     |    ⬜    |
+|                      FeiShu                      |   Source    |    ⬜    |
+|                      FeiShu                      |    Sink     |    ⬜    |
+|         [File](eventmesh-connector-file)         |   Source    |    ✅    |
+|         [File](eventmesh-connector-file)         |    Sink     |    ✅    |
+|                      Github                      |   Source    |    ⬜    |
+|                      Github                      |    Sink     |    ⬜    |
+|                       Http                       |   Source    |    ⬜    |
+|                       Http                       |    Sink     |    ⬜    |
+|                       Jdbc                       |   Source    |    ⬜    |
+|         [Jdbc](eventmesh-connector-jdbc)         |    Sink     |    ✅    |
+|        [Kafka](eventmesh-connector-kafka)        |   Source    |    ✅    |
+|        [Kafka](eventmesh-connector-kafka)        |    Sink     |    ✅    |
+|      [Knative](eventmesh-connector-knative)      |   Source    |    ✅    |
+|      [Knative](eventmesh-connector-knative)      |    Sink     |    ✅    |
+|      [MongoDB](eventmesh-connector-mongodb)      |   Source    |    ✅    |
+|      [MongoDB](eventmesh-connector-mongodb)      |    Sink     |    ✅    |
+| [OpenFunction](eventmesh-connector-openfunction) |   Source    |    ✅    |
+| [OpenFunction](eventmesh-connector-openfunction) |    Sink     |    ✅    |
+|      [Pravega](eventmesh-connector-pravega)      |   Source    |    ✅    |
+|      [Pravega](eventmesh-connector-pravega)      |    Sink     |    ✅    |
+|   [Promethues](eventmesh-connector-prometheus)   |   Source    |    ✅    |
+|   [Promethues](eventmesh-connector-prometheus)   |    Sink     |    ⬜    |
+|       [Pulsar](eventmesh-connector-pulsar)       |   Source    |    ✅    |
+|       [Pulsar](eventmesh-connector-pulsar)       |    Sink     |    ✅    |
+|     [Rabbitmq](eventmesh-connector-rabbitmq)     |   Source    |    ✅    |
+|     [Rabbitmq](eventmesh-connector-rabbitmq)     |    Sink     |    ✅    |
+|        [Redis](eventmesh-connector-redis)        |   Source    |    ✅    |
+|        [Redis](eventmesh-connector-redis)        |    Sink     |    ✅    |
+|                      S3File                      |   Source    |    ⬜    |
+|         [S3File](eventmesh-connector-s3)         |    Sink     |    ✅    |
+|        [Slack](eventmesh-connector-slack)        |   Source    |    ⬜    |
+|        [Slack](eventmesh-connector-slack)        |    Sink     |    ✅    |
+|       [Spring](eventmesh-connector-spring)       |   Source    |    ✅    |
+|       [Spring](eventmesh-connector-spring)       |    Sink     |    ✅    |
+|                      WeCom                       |   Source    |    ⬜    |
+|        [WeCom](eventmesh-connector-wecom)        |    Sink     |    ✅    |
+|         More connectors will be added...         | Source/Sink |   N/A   |
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-slack/build.gradle 
b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
new file mode 100644
index 000000000..ad66b78b9
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-slack/build.gradle
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+    implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+    implementation.exclude group: 'log4j', module: 'log4j'
+    testImplementation.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
+}
+
+dependencies {
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+    implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    
+    implementation "com.slack.api:bolt:1.1.+"
+    implementation 'com.google.guava:guava'
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.mockito:mockito-junit-jupiter"
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
new file mode 100644
index 000000000..515c5af6c
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/config/SlackConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class SlackConnectServerConfig extends Config {
+
+    private boolean sinkEnable;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
new file mode 100644
index 000000000..b59f0657a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/server/SlackConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.slack.config.SlackConnectServerConfig;
+import org.apache.eventmesh.connector.slack.sink.connector.SlackSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class SlackConnectServer {
+
+    public static void main(String[] args) throws Exception {
+
+        SlackConnectServerConfig slackConnectServerConfig = 
ConfigUtil.parse(SlackConnectServerConfig.class,
+            Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+        if (slackConnectServerConfig.isSinkEnable()) {
+            Application application = new Application();
+            application.run(SlackSinkConnector.class);
+        }
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..41884a94a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+    private String connectorName;
+
+    private String appToken;
+
+    private String channelId;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
new file mode 100644
index 000000000..016cd9ae9
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/config/SlackSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class SlackSinkConfig extends SinkConfig {
+
+    private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
new file mode 100644
index 000000000..a026f2aa2
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnector.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.connector;
+
+import org.apache.eventmesh.connector.slack.sink.config.SlackSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.slack.api.Slack;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Slack sink connector.
+ * Slack doc: <a href="https://api.slack.com/messaging/sending";>...</a>
+ */
+@Slf4j
+public class SlackSinkConnector implements Sink {
+
+    private SlackSinkConfig sinkConfig;
+
+    private volatile boolean isRunning = false;
+
+    private MethodsClient client;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return SlackSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) throws Exception {
+        // init config for dingding sink connector
+        this.sinkConfig = (SlackSinkConfig) config;
+        this.client = Slack.getInstance().methods();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) throws Exception {
+        // init config for dingding source connector
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (SlackSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        this.client = Slack.getInstance().methods();
+    }
+
+    @Override
+    public void start() {
+        isRunning = true;
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        isRunning = false;
+    }
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    @SneakyThrows
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord record : sinkRecords) {
+            publishMessage(record);
+        }
+    }
+
+    private void publishMessage(ConnectRecord record) {
+        try {
+            ChatPostMessageResponse response = client.chatPostMessage(r -> r
+                .token(sinkConfig.getSinkConnectorConfig().getAppToken())
+                .channel(sinkConfig.getSinkConnectorConfig().getChannelId())
+                .text(new String((byte[]) record.getData())));
+            if (Objects.nonNull(response) && 
StringUtils.isNotBlank(response.getError())) {
+                throw new IllegalAccessException(response.getError());
+            }
+        } catch (Exception e) {
+            log.error("Send message to slack error.", e);
+        }
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..062eaf349
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/main/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-SLACK
+    idc: FT
+    env: PRD
+    group: slackSink
+    appId: 5034
+    userName: slackSinkUser
+    passWord: slackPassWord
+sinkConnectorConfig:
+    connectorName: slackSink
+    appToken: slackAppToken
+    channelId: slackChannelId
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
new file mode 100644
index 000000000..3f0a32755
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/java/org/apache/eventmesh/connector/slack/sink/connector/SlackSinkConnectorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.slack.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.slack.sink.config.SlackSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.slack.api.RequestConfigurator;
+import com.slack.api.methods.MethodsClient;
+import com.slack.api.methods.response.chat.ChatPostMessageResponse;
+
+@ExtendWith(MockitoExtension.class)
+public class SlackSinkConnectorTest {
+
+    private SlackSinkConnector sinkConnector;
+
+    @Mock
+    private MethodsClient client;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        sinkConnector = new SlackSinkConnector();
+        SlackSinkConfig sinkConfig = (SlackSinkConfig) 
ConfigUtil.parse(sinkConnector.configClass());
+        sinkConnector.init(sinkConfig);
+        doReturn(new ChatPostMessageResponse())
+            .when(client).chatPostMessage(any(RequestConfigurator.class));
+        Field clientField = 
ReflectionSupport.findFields(sinkConnector.getClass(),
+            (f) -> f.getName().equals("client"),
+            HierarchyTraversalMode.BOTTOM_UP).get(0);
+        clientField.setAccessible(true);
+        clientField.set(sinkConnector, client);
+        sinkConnector.start();
+    }
+
+    @Test
+    public void testSendMessageToSlack() throws Exception {
+        final int times = 3;
+        List<ConnectRecord> records = new ArrayList<>();
+        for (int i = 0; i < times; i++) {
+            RecordPartition partition = new RecordPartition();
+            RecordOffset offset = new RecordOffset();
+            ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+                System.currentTimeMillis(), "Hello, 
EventMesh!".getBytes(StandardCharsets.UTF_8));
+            records.add(connectRecord);
+        }
+        sinkConnector.put(records);
+        verify(client, 
times(times)).chatPostMessage(any(RequestConfigurator.class));
+    }
+
+    @AfterEach
+    public void tearDown() {
+        sinkConnector.stop();
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..062eaf349
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-slack/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-SLACK
+    idc: FT
+    env: PRD
+    group: slackSink
+    appId: 5034
+    userName: slackSinkUser
+    passWord: slackPassWord
+sinkConnectorConfig:
+    connectorName: slackSink
+    appToken: slackAppToken
+    channelId: slackChannelId
diff --git a/settings.gradle b/settings.gradle
index 85877303e..9555fc5db 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -47,6 +47,7 @@ include 'eventmesh-connectors:eventmesh-connector-prometheus'
 include 'eventmesh-connectors:eventmesh-connector-dingding'
 include 'eventmesh-connectors:eventmesh-connector-feishu'
 include 'eventmesh-connectors:eventmesh-connector-wecom'
+include 'eventmesh-connectors:eventmesh-connector-slack'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to