This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new fc900e57d [ISSUE #4411] Add DingDing sink connnector. (#4557)
fc900e57d is described below
commit fc900e57dcd1a057931c03ae90038384e2776695
Author: yanrongzhen <[email protected]>
AuthorDate: Thu Nov 16 10:19:34 2023 +0800
[ISSUE #4411] Add DingDing sink connnector. (#4557)
* Add dingding sink connector.
* fix: Remove unused config.
* fix: Remove unused config.
* fix: review
* Add example config.
* fix: review
* fix: continue when record data is null.
---
.../eventmesh-connector-dingding/build.gradle | 36 ++++
.../constants/ConnectRecordExtensionKeys.java | 28 +++
.../config/DingDingConnectServerConfig.java | 30 ++++
.../config/DingDingMessageTemplateType.java | 43 +++++
.../dingding/server/DingDingConnectServer.java | 38 ++++
.../dingding/sink/config/DingDingSinkConfig.java | 30 ++++
.../dingding/sink/config/SinkConnectorConfig.java | 36 ++++
.../sink/connector/DingDingSinkConnector.java | 193 +++++++++++++++++++++
.../src/main/resources/server-config.yml | 18 ++
.../src/main/resources/sink-config.yml | 32 ++++
.../sink/connector/DingDingSinkConnectorTest.java | 112 ++++++++++++
.../src/test/resources/server-config.yml | 18 ++
.../src/test/resources/sink-config.yml | 32 ++++
settings.gradle | 2 +-
14 files changed, 647 insertions(+), 1 deletion(-)
diff --git a/eventmesh-connectors/eventmesh-connector-dingding/build.gradle
b/eventmesh-connectors/eventmesh-connector-dingding/build.gradle
new file mode 100644
index 000000000..46d16c3b3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-dingding/build.gradle
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+ implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+ implementation.exclude group: 'log4j', module: 'log4j'
+ testImplementation.exclude group: 'org.apache.logging.log4j', module:
'log4j-to-slf4j'
+}
+
+dependencies {
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation "com.aliyun:dingtalk:2.0.61"
+ implementation 'com.google.guava:guava'
+
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-junit-jupiter"
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
new file mode 100644
index 000000000..425a130dc
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.common.constants;
+
+/**
+ * Constants of record extension key.
+ */
+public interface ConnectRecordExtensionKeys {
+
+ String DINGDING_TEMPLATE_TYPE_KEY = "dingDingTemplateTypeKey";
+
+ String DINGDING_MARKDOWN_MESSAGE_TITLE = "dingDingMarkdownMessageTitle";
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
new file mode 100644
index 000000000..2475ccd85
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class DingDingConnectServerConfig extends Config {
+
+ private boolean sinkEnable;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
new file mode 100644
index 000000000..5cb2e077a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.config;
+
+import java.util.Arrays;
+
+public enum DingDingMessageTemplateType {
+
+ PLAIN_TEXT("sampleText"),
+ MARKDOWN("sampleMarkdown");
+
+ private final String templateKey;
+
+ DingDingMessageTemplateType(String templateKey) {
+ this.templateKey = templateKey;
+ }
+
+ public String getTemplateKey() {
+ return templateKey;
+ }
+
+ public static DingDingMessageTemplateType of(String templateKey) {
+ return Arrays.stream(values())
+ .filter(v -> v.getTemplateKey().equals(templateKey))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("TemplateKey: " +
templateKey + " not found."));
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
new file mode 100644
index 000000000..abd0ab844
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.server;
+
+import org.apache.eventmesh.common.Constants;
+import
org.apache.eventmesh.connector.dingding.config.DingDingConnectServerConfig;
+import
org.apache.eventmesh.connector.dingding.sink.connector.DingDingSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class DingDingConnectServer {
+
+ public static void main(String[] args) throws Exception {
+
+ DingDingConnectServerConfig dingDingConnectServerConfig =
ConfigUtil.parse(DingDingConnectServerConfig.class,
+ Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+ if (dingDingConnectServerConfig.isSinkEnable()) {
+ Application application = new Application();
+ application.run(DingDingSinkConnector.class);
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
new file mode 100644
index 000000000..242d31b57
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class DingDingSinkConfig extends SinkConfig {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..3ae669abf
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String appKey;
+
+ private String appSecret;
+
+ private String openConversationId;
+
+ private String robotCode;
+
+ private String coolAppCode;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
new file mode 100644
index 000000000..252108bcc
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import
org.apache.eventmesh.connector.dingding.common.constants.ConnectRecordExtensionKeys;
+import
org.apache.eventmesh.connector.dingding.config.DingDingMessageTemplateType;
+import org.apache.eventmesh.connector.dingding.sink.config.DingDingSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest;
+import com.aliyun.dingtalkrobot_1_0.models.OrgGroupSendHeaders;
+import com.aliyun.dingtalkrobot_1_0.models.OrgGroupSendRequest;
+import com.aliyun.tea.TeaException;
+import com.aliyun.teautil.Common;
+import com.aliyun.teautil.models.RuntimeOptions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DingDingSinkConnector implements Sink {
+
+ public static final Cache<String, String> AUTH_CACHE =
CacheBuilder.newBuilder()
+ .initialCapacity(12)
+ .maximumSize(10)
+ .concurrencyLevel(5)
+ .expireAfterWrite(20, TimeUnit.MINUTES)
+ .build();
+
+ public static final String ACCESS_TOKEN_CACHE_KEY = "access_token";
+
+ private DingDingSinkConfig sinkConfig;
+
+ private com.aliyun.dingtalkrobot_1_0.Client sendMessageClient;
+
+ private com.aliyun.dingtalkoauth2_1_0.Client authClient;
+
+ private volatile boolean isRunning = false;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return DingDingSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ // init config for dingding sink connector
+ this.sinkConfig = (DingDingSinkConfig) config;
+ sendMessageClient = createSendMessageClient();
+ authClient = createOAuthClient();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ // init config for dingding source connector
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
+ this.sinkConfig = (DingDingSinkConfig)
sinkConnectorContext.getSinkConfig();
+ sendMessageClient = createSendMessageClient();
+ authClient = createOAuthClient();
+ }
+
+ @Override
+ public void start() {
+ isRunning = true;
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() {
+ isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @SneakyThrows
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ for (ConnectRecord record : sinkRecords) {
+ try {
+ if (Objects.isNull(record.getData())) {
+ log.warn("ConnectRecord data is null, ignore.");
+ continue;
+ }
+ String accessToken = getAccessToken();
+ OrgGroupSendHeaders orgGroupSendHeaders =
+ new OrgGroupSendHeaders();
+ orgGroupSendHeaders.xAcsDingtalkAccessToken = accessToken;
+
+ String templateTypeKey =
record.getExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY);
+ if (null == templateTypeKey || "null".equals(templateTypeKey))
{
+ templateTypeKey =
DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey();
+ }
+ DingDingMessageTemplateType templateType =
DingDingMessageTemplateType.of(templateTypeKey);
+
+ Map<String, String> contentMap = new HashMap<>();
+ if (DingDingMessageTemplateType.PLAIN_TEXT == templateType) {
+ contentMap.put("content", new String((byte[])
record.getData()));
+ } else if (DingDingMessageTemplateType.MARKDOWN ==
templateType) {
+ String title =
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGDING_MARKDOWN_MESSAGE_TITLE))
+ .orElse("EventMesh-Message");
+ contentMap.put("title", title);
+ contentMap.put("text", String.valueOf(record.getData()));
+ }
+
+ OrgGroupSendRequest orgGroupSendRequest =
+ new OrgGroupSendRequest()
+ .setMsgParam(JsonUtils.toJSONString(contentMap))
+ .setMsgKey(templateType.getTemplateKey())
+
.setOpenConversationId(sinkConfig.getSinkConnectorConfig().getOpenConversationId())
+
.setRobotCode(sinkConfig.getSinkConnectorConfig().getRobotCode())
+
.setCoolAppCode(sinkConfig.getSinkConnectorConfig().getCoolAppCode());
+
+ try {
+
sendMessageClient.orgGroupSendWithOptions(orgGroupSendRequest,
orgGroupSendHeaders, new RuntimeOptions());
+ } catch (TeaException e) {
+ if (!Common.empty(e.code) && !Common.empty(e.message)) {
+ String errorMessage = e.getMessage();
+ if
("invalidParameter.token.invalid".equals(errorMessage)) {
+ AUTH_CACHE.invalidate(ACCESS_TOKEN_CACHE_KEY);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to sink message to DingDing.", e);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private String getAccessToken() {
+ return AUTH_CACHE.get(ACCESS_TOKEN_CACHE_KEY, () -> {
+ GetAccessTokenRequest getAccessTokenRequest =
+ new GetAccessTokenRequest()
+ .setAppKey(sinkConfig.getSinkConnectorConfig().getAppKey())
+
.setAppSecret(sinkConfig.getSinkConnectorConfig().getAppSecret());
+ return
authClient.getAccessToken(getAccessTokenRequest).getBody().getAccessToken();
+ });
+ }
+
+ public static com.aliyun.dingtalkrobot_1_0.Client
createSendMessageClient() throws Exception {
+ com.aliyun.teaopenapi.models.Config config = new
com.aliyun.teaopenapi.models.Config();
+ config.protocol = "https";
+ config.regionId = "central";
+ return new com.aliyun.dingtalkrobot_1_0.Client(config);
+ }
+
+ public static com.aliyun.dingtalkoauth2_1_0.Client createOAuthClient()
throws Exception {
+ com.aliyun.teaopenapi.models.Config config = new
com.aliyun.teaopenapi.models.Config();
+ config.protocol = "https";
+ config.regionId = "central";
+ return new com.aliyun.dingtalkoauth2_1_0.Client(config);
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..aceb61fd4
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-DINGDING
+ idc: FT
+ env: PRD
+ group: dingDingSink
+ appId: 5034
+ userName: dingDingSinkUser
+ passWord: dingDingPassWord
+sinkConnectorConfig:
+ connectorName: dingDingSink
+ appKey: dingDingAppKey
+ appSecret: dingDingAppSecret
+ openConversationId: dingDingOpenConversationId
+ robotCode: dingDingRobotCode
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
new file mode 100644
index 000000000..3de799d94
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import
org.apache.eventmesh.connector.dingding.common.constants.ConnectRecordExtensionKeys;
+import
org.apache.eventmesh.connector.dingding.config.DingDingMessageTemplateType;
+import org.apache.eventmesh.connector.dingding.sink.config.DingDingSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponse;
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponseBody;
+
+@ExtendWith(MockitoExtension.class)
+public class DingDingSinkConnectorTest {
+
+ @Spy
+ private DingDingSinkConnector connector;
+
+ @Mock
+ private com.aliyun.dingtalkrobot_1_0.Client sendMessageClient;
+
+ @Mock
+ private com.aliyun.dingtalkoauth2_1_0.Client authClient;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ Mockito.doReturn(null).when(sendMessageClient)
+ .orgGroupSendWithOptions(Mockito.any(), Mockito.any(),
Mockito.any());
+ GetAccessTokenResponse response = new GetAccessTokenResponse();
+ GetAccessTokenResponseBody body = new GetAccessTokenResponseBody();
+ body.setAccessToken("testAccessToken");
+ response.setBody(body);
+
Mockito.doReturn(response).when(authClient).getAccessToken(Mockito.any());
+
+ DingDingSinkConfig sinkConfig = (DingDingSinkConfig)
ConfigUtil.parse(connector.configClass());
+ connector.init(sinkConfig);
+ Field sendMessageClientField =
ReflectionSupport.findFields(connector.getClass(),
+ (f) -> f.getName().equals("sendMessageClient"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ Field authClientField =
ReflectionSupport.findFields(connector.getClass(),
+ (f) -> f.getName().equals("authClient"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ sendMessageClientField.setAccessible(true);
+ authClientField.setAccessible(true);
+ sendMessageClientField.set(connector, sendMessageClient);
+ authClientField.set(connector, authClient);
+ connector.start();
+ }
+
+ @Test
+ public void testSendMessageToDingDing() throws Exception {
+ final int times = 3;
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(), "Hello,
EventMesh!".getBytes(StandardCharsets.UTF_8));
+
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY,
+ DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey());
+ records.add(connectRecord);
+ }
+ connector.put(records);
+ verify(sendMessageClient, times(times)).orgGroupSendWithOptions(any(),
any(), any());
+ // verify for access token cache.
+ verify(authClient, times(1)).getAccessToken(any());
+ }
+
+ @AfterEach
+ public void tearDown() {
+
DingDingSinkConnector.AUTH_CACHE.invalidate(DingDingSinkConnector.ACCESS_TOKEN_CACHE_KEY);
+ connector.stop();
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..53f4a24da
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-DINGDING
+ idc: FT
+ env: PRD
+ group: dingDingSink
+ appId: 5034
+ userName: dingDingSinkUser
+ passWord: dingDingPassWord
+sinkConnectorConfig:
+ connectorName: dingDingSink
+ appKey: dingDingAppKey
+ appSecret: dingDingAppSecret
+ openConversationId: dingDingOpenConversationId
+ robotCode: dingDingRobotCode
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index fd91ebd59..6ce44c1d6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -44,6 +44,7 @@ include 'eventmesh-connectors:eventmesh-connector-jdbc'
include 'eventmesh-connectors:eventmesh-connector-file'
include 'eventmesh-connectors:eventmesh-connector-spring'
include 'eventmesh-connectors:eventmesh-connector-prometheus'
+include 'eventmesh-connectors:eventmesh-connector-dingding'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
@@ -93,4 +94,3 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
include 'eventmesh-retry'
include 'eventmesh-retry:eventmesh-retry-api'
include 'eventmesh-retry:eventmesh-retry-rocketmq'
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]