This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new fc900e57d [ISSUE #4411] Add DingDing sink connnector. (#4557)
fc900e57d is described below

commit fc900e57dcd1a057931c03ae90038384e2776695
Author: yanrongzhen <[email protected]>
AuthorDate: Thu Nov 16 10:19:34 2023 +0800

    [ISSUE #4411] Add DingDing sink connnector. (#4557)
    
    * Add dingding sink connector.
    
    * fix: Remove unused config.
    
    * fix: Remove unused config.
    
    * fix: review
    
    * Add example config.
    
    * fix: review
    
    * fix: continue when record data is null.
---
 .../eventmesh-connector-dingding/build.gradle      |  36 ++++
 .../constants/ConnectRecordExtensionKeys.java      |  28 +++
 .../config/DingDingConnectServerConfig.java        |  30 ++++
 .../config/DingDingMessageTemplateType.java        |  43 +++++
 .../dingding/server/DingDingConnectServer.java     |  38 ++++
 .../dingding/sink/config/DingDingSinkConfig.java   |  30 ++++
 .../dingding/sink/config/SinkConnectorConfig.java  |  36 ++++
 .../sink/connector/DingDingSinkConnector.java      | 193 +++++++++++++++++++++
 .../src/main/resources/server-config.yml           |  18 ++
 .../src/main/resources/sink-config.yml             |  32 ++++
 .../sink/connector/DingDingSinkConnectorTest.java  | 112 ++++++++++++
 .../src/test/resources/server-config.yml           |  18 ++
 .../src/test/resources/sink-config.yml             |  32 ++++
 settings.gradle                                    |   2 +-
 14 files changed, 647 insertions(+), 1 deletion(-)

diff --git a/eventmesh-connectors/eventmesh-connector-dingding/build.gradle 
b/eventmesh-connectors/eventmesh-connector-dingding/build.gradle
new file mode 100644
index 000000000..46d16c3b3
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-dingding/build.gradle
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+    implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+    implementation.exclude group: 'log4j', module: 'log4j'
+    testImplementation.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
+}
+
+dependencies {
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+    implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    implementation "com.aliyun:dingtalk:2.0.61"
+    implementation 'com.google.guava:guava'
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.mockito:mockito-junit-jupiter"
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
new file mode 100644
index 000000000..425a130dc
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/common/constants/ConnectRecordExtensionKeys.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.common.constants;
+
+/**
+ * Constants of record extension key.
+ */
+public interface ConnectRecordExtensionKeys {
+
+    String DINGDING_TEMPLATE_TYPE_KEY = "dingDingTemplateTypeKey";
+
+    String DINGDING_MARKDOWN_MESSAGE_TITLE = "dingDingMarkdownMessageTitle";
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
new file mode 100644
index 000000000..2475ccd85
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class DingDingConnectServerConfig extends Config {
+
+    private boolean sinkEnable;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
new file mode 100644
index 000000000..5cb2e077a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/config/DingDingMessageTemplateType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.config;
+
+import java.util.Arrays;
+
+public enum DingDingMessageTemplateType {
+
+    PLAIN_TEXT("sampleText"),
+    MARKDOWN("sampleMarkdown");
+
+    private final String templateKey;
+
+    DingDingMessageTemplateType(String templateKey) {
+        this.templateKey = templateKey;
+    }
+
+    public String getTemplateKey() {
+        return templateKey;
+    }
+
+    public static DingDingMessageTemplateType of(String templateKey) {
+        return Arrays.stream(values())
+            .filter(v -> v.getTemplateKey().equals(templateKey))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("TemplateKey: " + 
templateKey + " not found."));
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
new file mode 100644
index 000000000..abd0ab844
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/server/DingDingConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.server;
+
+import org.apache.eventmesh.common.Constants;
+import 
org.apache.eventmesh.connector.dingding.config.DingDingConnectServerConfig;
+import 
org.apache.eventmesh.connector.dingding.sink.connector.DingDingSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class DingDingConnectServer {
+
+    public static void main(String[] args) throws Exception {
+
+        DingDingConnectServerConfig dingDingConnectServerConfig = 
ConfigUtil.parse(DingDingConnectServerConfig.class,
+            Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+        if (dingDingConnectServerConfig.isSinkEnable()) {
+            Application application = new Application();
+            application.run(DingDingSinkConnector.class);
+        }
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
new file mode 100644
index 000000000..242d31b57
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/DingDingSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class DingDingSinkConfig extends SinkConfig {
+
+    private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..3ae669abf
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+    private String connectorName;
+
+    private String appKey;
+
+    private String appSecret;
+
+    private String openConversationId;
+
+    private String robotCode;
+
+    private String coolAppCode;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
new file mode 100644
index 000000000..252108bcc
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnector.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import 
org.apache.eventmesh.connector.dingding.common.constants.ConnectRecordExtensionKeys;
+import 
org.apache.eventmesh.connector.dingding.config.DingDingMessageTemplateType;
+import org.apache.eventmesh.connector.dingding.sink.config.DingDingSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenRequest;
+import com.aliyun.dingtalkrobot_1_0.models.OrgGroupSendHeaders;
+import com.aliyun.dingtalkrobot_1_0.models.OrgGroupSendRequest;
+import com.aliyun.tea.TeaException;
+import com.aliyun.teautil.Common;
+import com.aliyun.teautil.models.RuntimeOptions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DingDingSinkConnector implements Sink {
+
+    public static final Cache<String, String> AUTH_CACHE = 
CacheBuilder.newBuilder()
+        .initialCapacity(12)
+        .maximumSize(10)
+        .concurrencyLevel(5)
+        .expireAfterWrite(20, TimeUnit.MINUTES)
+        .build();
+
+    public static final String ACCESS_TOKEN_CACHE_KEY = "access_token";
+
+    private DingDingSinkConfig sinkConfig;
+
+    private com.aliyun.dingtalkrobot_1_0.Client sendMessageClient;
+
+    private com.aliyun.dingtalkoauth2_1_0.Client authClient;
+
+    private volatile boolean isRunning = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return DingDingSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) throws Exception {
+        // init config for dingding sink connector
+        this.sinkConfig = (DingDingSinkConfig) config;
+        sendMessageClient = createSendMessageClient();
+        authClient = createOAuthClient();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) throws Exception {
+        // init config for dingding source connector
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (DingDingSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        sendMessageClient = createSendMessageClient();
+        authClient = createOAuthClient();
+    }
+
+    @Override
+    public void start() {
+        isRunning = true;
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        isRunning = false;
+    }
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    @SneakyThrows
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord record : sinkRecords) {
+            try {
+                if (Objects.isNull(record.getData())) {
+                    log.warn("ConnectRecord data is null, ignore.");
+                    continue;
+                }
+                String accessToken = getAccessToken();
+                OrgGroupSendHeaders orgGroupSendHeaders =
+                    new OrgGroupSendHeaders();
+                orgGroupSendHeaders.xAcsDingtalkAccessToken = accessToken;
+
+                String templateTypeKey = 
record.getExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY);
+                if (null == templateTypeKey || "null".equals(templateTypeKey)) 
{
+                    templateTypeKey = 
DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey();
+                }
+                DingDingMessageTemplateType templateType = 
DingDingMessageTemplateType.of(templateTypeKey);
+
+                Map<String, String> contentMap = new HashMap<>();
+                if (DingDingMessageTemplateType.PLAIN_TEXT == templateType) {
+                    contentMap.put("content", new String((byte[]) 
record.getData()));
+                } else if (DingDingMessageTemplateType.MARKDOWN == 
templateType) {
+                    String title = 
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.DINGDING_MARKDOWN_MESSAGE_TITLE))
+                        .orElse("EventMesh-Message");
+                    contentMap.put("title", title);
+                    contentMap.put("text", String.valueOf(record.getData()));
+                }
+
+                OrgGroupSendRequest orgGroupSendRequest =
+                    new OrgGroupSendRequest()
+                        .setMsgParam(JsonUtils.toJSONString(contentMap))
+                        .setMsgKey(templateType.getTemplateKey())
+                        
.setOpenConversationId(sinkConfig.getSinkConnectorConfig().getOpenConversationId())
+                        
.setRobotCode(sinkConfig.getSinkConnectorConfig().getRobotCode())
+                        
.setCoolAppCode(sinkConfig.getSinkConnectorConfig().getCoolAppCode());
+
+                try {
+                    
sendMessageClient.orgGroupSendWithOptions(orgGroupSendRequest, 
orgGroupSendHeaders, new RuntimeOptions());
+                } catch (TeaException e) {
+                    if (!Common.empty(e.code) && !Common.empty(e.message)) {
+                        String errorMessage = e.getMessage();
+                        if 
("invalidParameter.token.invalid".equals(errorMessage)) {
+                            AUTH_CACHE.invalidate(ACCESS_TOKEN_CACHE_KEY);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Failed to sink message to DingDing.", e);
+            }
+        }
+    }
+
+    @SneakyThrows
+    private String getAccessToken() {
+        return AUTH_CACHE.get(ACCESS_TOKEN_CACHE_KEY, () -> {
+            GetAccessTokenRequest getAccessTokenRequest =
+                new GetAccessTokenRequest()
+                    .setAppKey(sinkConfig.getSinkConnectorConfig().getAppKey())
+                    
.setAppSecret(sinkConfig.getSinkConnectorConfig().getAppSecret());
+            return 
authClient.getAccessToken(getAccessTokenRequest).getBody().getAccessToken();
+        });
+    }
+
+    public static com.aliyun.dingtalkrobot_1_0.Client 
createSendMessageClient() throws Exception {
+        com.aliyun.teaopenapi.models.Config config = new 
com.aliyun.teaopenapi.models.Config();
+        config.protocol = "https";
+        config.regionId = "central";
+        return new com.aliyun.dingtalkrobot_1_0.Client(config);
+    }
+
+    public static com.aliyun.dingtalkoauth2_1_0.Client createOAuthClient() 
throws Exception {
+        com.aliyun.teaopenapi.models.Config config = new 
com.aliyun.teaopenapi.models.Config();
+        config.protocol = "https";
+        config.regionId = "central";
+        return new com.aliyun.dingtalkoauth2_1_0.Client(config);
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..aceb61fd4
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/main/resources/sink-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-DINGDING
+    idc: FT
+    env: PRD
+    group: dingDingSink
+    appId: 5034
+    userName: dingDingSinkUser
+    passWord: dingDingPassWord
+sinkConnectorConfig:
+    connectorName: dingDingSink
+    appKey: dingDingAppKey
+    appSecret: dingDingAppSecret
+    openConversationId: dingDingOpenConversationId
+    robotCode: dingDingRobotCode
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
new file mode 100644
index 000000000..3de799d94
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/java/org/apache/eventmesh/connector/dingding/sink/connector/DingDingSinkConnectorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.dingding.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import 
org.apache.eventmesh.connector.dingding.common.constants.ConnectRecordExtensionKeys;
+import 
org.apache.eventmesh.connector.dingding.config.DingDingMessageTemplateType;
+import org.apache.eventmesh.connector.dingding.sink.config.DingDingSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponse;
+import com.aliyun.dingtalkoauth2_1_0.models.GetAccessTokenResponseBody;
+
+@ExtendWith(MockitoExtension.class)
+public class DingDingSinkConnectorTest {
+
+    @Spy
+    private DingDingSinkConnector connector;
+
+    @Mock
+    private com.aliyun.dingtalkrobot_1_0.Client sendMessageClient;
+
+    @Mock
+    private com.aliyun.dingtalkoauth2_1_0.Client authClient;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        Mockito.doReturn(null).when(sendMessageClient)
+            .orgGroupSendWithOptions(Mockito.any(), Mockito.any(), 
Mockito.any());
+        GetAccessTokenResponse response = new GetAccessTokenResponse();
+        GetAccessTokenResponseBody body = new GetAccessTokenResponseBody();
+        body.setAccessToken("testAccessToken");
+        response.setBody(body);
+        
Mockito.doReturn(response).when(authClient).getAccessToken(Mockito.any());
+
+        DingDingSinkConfig sinkConfig = (DingDingSinkConfig) 
ConfigUtil.parse(connector.configClass());
+        connector.init(sinkConfig);
+        Field sendMessageClientField = 
ReflectionSupport.findFields(connector.getClass(),
+            (f) -> f.getName().equals("sendMessageClient"),
+            HierarchyTraversalMode.BOTTOM_UP).get(0);
+        Field authClientField = 
ReflectionSupport.findFields(connector.getClass(),
+            (f) -> f.getName().equals("authClient"),
+            HierarchyTraversalMode.BOTTOM_UP).get(0);
+        sendMessageClientField.setAccessible(true);
+        authClientField.setAccessible(true);
+        sendMessageClientField.set(connector, sendMessageClient);
+        authClientField.set(connector, authClient);
+        connector.start();
+    }
+
+    @Test
+    public void testSendMessageToDingDing() throws Exception {
+        final int times = 3;
+        List<ConnectRecord> records = new ArrayList<>();
+        for (int i = 0; i < times; i++) {
+            RecordPartition partition = new RecordPartition();
+            RecordOffset offset = new RecordOffset();
+            ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+                System.currentTimeMillis(), "Hello, 
EventMesh!".getBytes(StandardCharsets.UTF_8));
+            
connectRecord.addExtension(ConnectRecordExtensionKeys.DINGDING_TEMPLATE_TYPE_KEY,
+                DingDingMessageTemplateType.PLAIN_TEXT.getTemplateKey());
+            records.add(connectRecord);
+        }
+        connector.put(records);
+        verify(sendMessageClient, times(times)).orgGroupSendWithOptions(any(), 
any(), any());
+        // verify for access token cache.
+        verify(authClient, times(1)).getAccessToken(any());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        
DingDingSinkConnector.AUTH_CACHE.invalidate(DingDingSinkConnector.ACCESS_TOKEN_CACHE_KEY);
+        connector.stop();
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..53f4a24da
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-dingding/src/test/resources/sink-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-DINGDING
+    idc: FT
+    env: PRD
+    group: dingDingSink
+    appId: 5034
+    userName: dingDingSinkUser
+    passWord: dingDingPassWord
+sinkConnectorConfig:
+    connectorName: dingDingSink
+    appKey: dingDingAppKey
+    appSecret: dingDingAppSecret
+    openConversationId: dingDingOpenConversationId
+    robotCode: dingDingRobotCode
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index fd91ebd59..6ce44c1d6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -44,6 +44,7 @@ include 'eventmesh-connectors:eventmesh-connector-jdbc'
 include 'eventmesh-connectors:eventmesh-connector-file'
 include 'eventmesh-connectors:eventmesh-connector-spring'
 include 'eventmesh-connectors:eventmesh-connector-prometheus'
+include 'eventmesh-connectors:eventmesh-connector-dingding'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
@@ -93,4 +94,3 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
 include 'eventmesh-retry'
 include 'eventmesh-retry:eventmesh-retry-api'
 include 'eventmesh-retry:eventmesh-retry-rocketmq'
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to