This is an automated email from the ASF dual-hosted git repository.

pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new ad4ecf3d8 [ISSUE#4412] Add WeCom sink connector. (#4558)
ad4ecf3d8 is described below

commit ad4ecf3d8aa8ac5c66712fc6ad9aad9862ff80b4
Author: yanrongzhen <[email protected]>
AuthorDate: Sun Nov 19 18:15:27 2023 +0800

    [ISSUE#4412] Add WeCom sink connector. (#4558)
    
    * Add WeCom sink connector.
    
    * Refactor: use robot webhook instead of app notification.
    
    * fix: Add test resources.
    
    * fix: code review
    
    * fix code review
---
 .../eventmesh-connector-wecom/build.gradle         |  39 ++++++
 .../wecom/config/WeComConnectServerConfig.java     |  30 +++++
 .../wecom/config/WeComMessageTemplateType.java     |  43 ++++++
 .../constants/ConnectRecordExtensionKeys.java      |  27 ++++
 .../connector/wecom/server/WeComConnectServer.java |  38 ++++++
 .../wecom/sink/config/SinkConnectorConfig.java     |  29 ++++
 .../wecom/sink/config/WeComSinkConfig.java         |  30 +++++
 .../wecom/sink/connector/SendMessageRequest.java   |  39 ++++++
 .../wecom/sink/connector/SendMessageResponse.java  |  32 +++++
 .../wecom/sink/connector/WeComSinkConnector.java   | 150 +++++++++++++++++++++
 .../src/main/resources/server-config.yml           |  18 +++
 .../src/main/resources/sink-config.yml             |  29 ++++
 .../wecom/connector/WeComSinkConnectorTest.java    | 110 +++++++++++++++
 .../src/test/resources/server-config.yml           |  18 +++
 .../src/test/resources/sink-config.yml             |  29 ++++
 .../offsetmgmt/api/data/ConnectRecord.java         |   2 +-
 settings.gradle                                    |   2 +-
 17 files changed, 663 insertions(+), 2 deletions(-)

diff --git a/eventmesh-connectors/eventmesh-connector-wecom/build.gradle 
b/eventmesh-connectors/eventmesh-connector-wecom/build.gradle
new file mode 100644
index 000000000..746a4f372
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-wecom/build.gradle
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+    implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+    implementation.exclude group: 'log4j', module: 'log4j'
+    testImplementation.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
+}
+
+dependencies {
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+    implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+
+    implementation 'com.google.guava:guava'
+    implementation "io.netty:netty-all"
+    implementation 'org.apache.httpcomponents:httpclient'
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.mockito:mockito-junit-jupiter"
+    testImplementation "org.mockito:mockito-inline"
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
new file mode 100644
index 000000000..1f864726b
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeComConnectServerConfig extends Config {
+
+    private boolean sinkEnable;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
new file mode 100644
index 000000000..459d07894
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.config;
+
+import java.util.Arrays;
+
+public enum WeComMessageTemplateType {
+
+    PLAIN_TEXT("text"),
+    MARKDOWN("markdown");
+
+    private final String templateKey;
+
+    WeComMessageTemplateType(String templateKey) {
+        this.templateKey = templateKey;
+    }
+
+    public String getTemplateKey() {
+        return templateKey;
+    }
+
+    public static WeComMessageTemplateType of(String templateKey) {
+        return Arrays.stream(values())
+            .filter(v -> v.getTemplateKey().equals(templateKey))
+            .findFirst()
+            .orElseThrow(() -> new IllegalArgumentException("TemplateKey: " + 
templateKey + " not found."));
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
new file mode 100644
index 000000000..7958a7375
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.constants;
+
+/**
+ * Constants of record extension key.
+ */
+public interface ConnectRecordExtensionKeys {
+
+    String WECOM_MESSAGE_TEMPLATE_TYPE_KEY = "weCom:MessageTemplateTypeKey";
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
new file mode 100644
index 000000000..f9c486168
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.wecom.config.WeComConnectServerConfig;
+import org.apache.eventmesh.connector.wecom.sink.connector.WeComSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class WeComConnectServer {
+
+    public static void main(String[] args) throws Exception {
+
+        WeComConnectServerConfig weComConnectServerConfig = 
ConfigUtil.parse(WeComConnectServerConfig.class,
+            Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+        if (weComConnectServerConfig.isSinkEnable()) {
+            Application application = new Application();
+            application.run(WeComSinkConnector.class);
+        }
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..013d5a8bb
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+    private String connectorName;
+
+    private String robotWebhookKey;
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
new file mode 100644
index 000000000..8af43bdbe
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeComSinkConfig extends SinkConfig {
+
+    private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
new file mode 100644
index 000000000..43f1f1657
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+@Data
+@Accessors(chain = true)
+public class SendMessageRequest {
+
+    @JsonProperty("msgtype")
+    private String messageType;
+
+    @JsonProperty("text")
+    private Map<String, Object> textContent;
+
+    @JsonProperty("markdown")
+    private Map<String, Object> markdownContent;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
new file mode 100644
index 000000000..0a64f326d
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+
+@Data
+public class SendMessageResponse {
+
+    @JsonProperty("errcode")
+    private int errorCode;
+
+    @JsonProperty("errmsg")
+    private String errorMessage;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
new file mode 100644
index 000000000..bf884916d
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.enums.EventMeshDataContentType;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wecom.config.WeComMessageTemplateType;
+import 
org.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys;
+import org.apache.eventmesh.connector.wecom.sink.config.WeComSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WeCom sink connector.
+ * WeCom doc: <a 
href="https://developer.work.weixin.qq.com/document/path/90236";>...</a>
+ */
+@Slf4j
+public class WeComSinkConnector implements Sink {
+
+    private static final String ROBOT_WEBHOOK_URL_PREFIX = 
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=";;
+
+    private CloseableHttpClient httpClient;
+
+    private WeComSinkConfig sinkConfig;
+
+    private volatile boolean isRunning = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return WeComSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.sinkConfig = (WeComSinkConfig) config;
+        httpClient = HttpClientBuilder.create().build();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (WeComSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        httpClient = HttpClientBuilder.create().build();
+    }
+
+    @Override
+    public void start() {
+        isRunning = true;
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        isRunning = false;
+        httpClient.close();
+    }
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord record : sinkRecords) {
+            try {
+                if (Objects.isNull(record.getData())) {
+                    log.warn("ConnectRecord data is null, ignore.");
+                    continue;
+                }
+                sendMessage(record);
+            } catch (Exception e) {
+                log.error("Failed to sink message to WeCom.", e);
+            }
+        }
+    }
+
+    @SneakyThrows
+    private void sendMessage(ConnectRecord record) {
+        final String target = ROBOT_WEBHOOK_URL_PREFIX + 
sinkConfig.getSinkConnectorConfig().getRobotWebhookKey();
+        SendMessageRequest request = new SendMessageRequest();
+        HttpPost httpPost = new HttpPost(target);
+        httpPost.addHeader("Content-Type", 
EventMeshDataContentType.JSON.getCode());
+        WeComMessageTemplateType templateType = WeComMessageTemplateType.of(
+            
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.WECOM_MESSAGE_TEMPLATE_TYPE_KEY))
+                .orElse(WeComMessageTemplateType.PLAIN_TEXT.getTemplateKey()));
+        Map<String, Object> contentMap = new HashMap<>();
+        if (WeComMessageTemplateType.PLAIN_TEXT == templateType) {
+            contentMap.put("content", new String((byte[]) record.getData()));
+            request.setTextContent(contentMap);
+        } else if (WeComMessageTemplateType.MARKDOWN == templateType) {
+            contentMap.put("content", new String((byte[]) record.getData()));
+            request.setMarkdownContent(contentMap);
+        }
+        request.setMessageType(templateType.getTemplateKey());
+        httpPost.setEntity(new 
StringEntity(Objects.requireNonNull(JsonUtils.toJSONString(request)), 
ContentType.APPLICATION_JSON));
+        CloseableHttpResponse httpResponse = httpClient.execute(httpPost);
+        String resultStr = EntityUtils.toString(httpResponse.getEntity(), 
Constants.DEFAULT_CHARSET);
+        SendMessageResponse sendMessageResponse = 
Objects.requireNonNull(JsonUtils.parseObject(resultStr, 
SendMessageResponse.class));
+        if (sendMessageResponse.getErrorCode() != 0) {
+            throw new IllegalAccessException(String.format("Send message to 
weCom error! errorCode=%s, errorMessage=%s",
+                sendMessageResponse.getErrorCode(), 
sendMessageResponse.getErrorMessage()));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..c21d5db0d
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-WECOM
+    idc: FT
+    env: PRD
+    group: weComSink
+    appId: 5034
+    userName: weComSinkUser
+    passWord: weComPassWord
+sinkConnectorConfig:
+    connectorName: weComSink
+    robotWebhookKey: weComRobotWebhookKey
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
new file mode 100644
index 000000000..de363346b
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wecom.config.WeComMessageTemplateType;
+import 
org.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys;
+import org.apache.eventmesh.connector.wecom.sink.config.WeComSinkConfig;
+import org.apache.eventmesh.connector.wecom.sink.connector.SendMessageResponse;
+import org.apache.eventmesh.connector.wecom.sink.connector.WeComSinkConnector;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class WeComSinkConnectorTest {
+
+    private WeComSinkConnector connector;
+
+    @Mock
+    private CloseableHttpClient httpClient;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        connector = new WeComSinkConnector();
+        CloseableHttpResponse mockedResponse = 
Mockito.mock(CloseableHttpResponse.class);
+        HttpEntity httpEntity = Mockito.mock(HttpEntity.class);
+        
Mockito.doReturn(mockedResponse).when(httpClient).execute(any(HttpPost.class));
+        Mockito.doReturn(httpEntity).when(mockedResponse).getEntity();
+        WeComSinkConfig sinkConfig = (WeComSinkConfig) 
ConfigUtil.parse(connector.configClass());
+        connector.init(sinkConfig);
+        Field httpClientField = 
ReflectionSupport.findFields(connector.getClass(),
+            (f) -> f.getName().equals("httpClient"),
+            HierarchyTraversalMode.BOTTOM_UP).get(0);
+        httpClientField.setAccessible(true);
+        httpClientField.set(connector, httpClient);
+        connector.start();
+    }
+
+    @Test
+    public void testSendMessageToWeCom() throws IOException {
+        try (MockedStatic<EntityUtils> entityUtilsMockedStatic = 
Mockito.mockStatic(EntityUtils.class)) {
+            entityUtilsMockedStatic.when(() -> 
EntityUtils.toString(any(HttpEntity.class), any(Charset.class)))
+                .thenReturn(JsonUtils.toJSONString(new SendMessageResponse()));
+            final int times = 3;
+            List<ConnectRecord> records = new ArrayList<>();
+            for (int i = 0; i < times; i++) {
+                RecordPartition partition = new RecordPartition();
+                RecordOffset offset = new RecordOffset();
+                ConnectRecord connectRecord = new ConnectRecord(partition, 
offset,
+                    System.currentTimeMillis(), "Hello, 
EventMesh!".getBytes(StandardCharsets.UTF_8));
+                
connectRecord.addExtension(ConnectRecordExtensionKeys.WECOM_MESSAGE_TEMPLATE_TYPE_KEY,
+                    WeComMessageTemplateType.PLAIN_TEXT.getTemplateKey());
+                records.add(connectRecord);
+            }
+            connector.put(records);
+            verify(httpClient, times(times)).execute(any(HttpPost.class));
+        }
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        connector.stop();
+        httpClient.close();
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..c21d5db0d
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-WECOM
+    idc: FT
+    env: PRD
+    group: weComSink
+    appId: 5034
+    userName: weComSinkUser
+    passWord: weComPassWord
+sinkConnectorConfig:
+    connectorName: weComSink
+    robotWebhookKey: weComRobotWebhookKey
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 7766162e5..68a4f0537 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -95,7 +95,7 @@ public class ConnectRecord {
     }
 
     public String getExtension(String key) {
-        if (this.extensions == null) {
+        if (this.extensions == null || !extensions.containsKey(key)) {
             return null;
         }
         return this.extensions.getString(key);
diff --git a/settings.gradle b/settings.gradle
index 414b086ab..85877303e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -46,6 +46,7 @@ include 'eventmesh-connectors:eventmesh-connector-spring'
 include 'eventmesh-connectors:eventmesh-connector-prometheus'
 include 'eventmesh-connectors:eventmesh-connector-dingding'
 include 'eventmesh-connectors:eventmesh-connector-feishu'
+include 'eventmesh-connectors:eventmesh-connector-wecom'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
@@ -95,4 +96,3 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
 include 'eventmesh-retry'
 include 'eventmesh-retry:eventmesh-retry-api'
 include 'eventmesh-retry:eventmesh-retry-rocketmq'
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to