This is an automated email from the ASF dual-hosted git repository.
pandaapo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new ad4ecf3d8 [ISSUE#4412] Add WeCom sink connector. (#4558)
ad4ecf3d8 is described below
commit ad4ecf3d8aa8ac5c66712fc6ad9aad9862ff80b4
Author: yanrongzhen <[email protected]>
AuthorDate: Sun Nov 19 18:15:27 2023 +0800
[ISSUE#4412] Add WeCom sink connector. (#4558)
* Add WeCom sink connector.
* Refactor: use robot webhook instead of app notification.
* fix: Add test resources.
* fix: code review
* fix code review
---
.../eventmesh-connector-wecom/build.gradle | 39 ++++++
.../wecom/config/WeComConnectServerConfig.java | 30 +++++
.../wecom/config/WeComMessageTemplateType.java | 43 ++++++
.../constants/ConnectRecordExtensionKeys.java | 27 ++++
.../connector/wecom/server/WeComConnectServer.java | 38 ++++++
.../wecom/sink/config/SinkConnectorConfig.java | 29 ++++
.../wecom/sink/config/WeComSinkConfig.java | 30 +++++
.../wecom/sink/connector/SendMessageRequest.java | 39 ++++++
.../wecom/sink/connector/SendMessageResponse.java | 32 +++++
.../wecom/sink/connector/WeComSinkConnector.java | 150 +++++++++++++++++++++
.../src/main/resources/server-config.yml | 18 +++
.../src/main/resources/sink-config.yml | 29 ++++
.../wecom/connector/WeComSinkConnectorTest.java | 110 +++++++++++++++
.../src/test/resources/server-config.yml | 18 +++
.../src/test/resources/sink-config.yml | 29 ++++
.../offsetmgmt/api/data/ConnectRecord.java | 2 +-
settings.gradle | 2 +-
17 files changed, 663 insertions(+), 2 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-wecom/build.gradle
b/eventmesh-connectors/eventmesh-connector-wecom/build.gradle
new file mode 100644
index 000000000..746a4f372
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-wecom/build.gradle
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+ implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+ implementation.exclude group: 'log4j', module: 'log4j'
+ testImplementation.exclude group: 'org.apache.logging.log4j', module:
'log4j-to-slf4j'
+}
+
+dependencies {
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+
+ implementation 'com.google.guava:guava'
+ implementation "io.netty:netty-all"
+ implementation 'org.apache.httpcomponents:httpclient'
+
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-junit-jupiter"
+ testImplementation "org.mockito:mockito-inline"
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
new file mode 100644
index 000000000..1f864726b
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeComConnectServerConfig extends Config {
+
+ private boolean sinkEnable;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
new file mode 100644
index 000000000..459d07894
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/config/WeComMessageTemplateType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.config;
+
+import java.util.Arrays;
+
+public enum WeComMessageTemplateType {
+
+ PLAIN_TEXT("text"),
+ MARKDOWN("markdown");
+
+ private final String templateKey;
+
+ WeComMessageTemplateType(String templateKey) {
+ this.templateKey = templateKey;
+ }
+
+ public String getTemplateKey() {
+ return templateKey;
+ }
+
+ public static WeComMessageTemplateType of(String templateKey) {
+ return Arrays.stream(values())
+ .filter(v -> v.getTemplateKey().equals(templateKey))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("TemplateKey: " +
templateKey + " not found."));
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
new file mode 100644
index 000000000..7958a7375
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/constants/ConnectRecordExtensionKeys.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.constants;
+
+/**
+ * Constants of record extension key.
+ */
+public interface ConnectRecordExtensionKeys {
+
+ String WECOM_MESSAGE_TEMPLATE_TYPE_KEY = "weCom:MessageTemplateTypeKey";
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
new file mode 100644
index 000000000..f9c486168
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/server/WeComConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.wecom.config.WeComConnectServerConfig;
+import org.apache.eventmesh.connector.wecom.sink.connector.WeComSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class WeComConnectServer {
+
+ public static void main(String[] args) throws Exception {
+
+ WeComConnectServerConfig weComConnectServerConfig =
ConfigUtil.parse(WeComConnectServerConfig.class,
+ Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+ if (weComConnectServerConfig.isSinkEnable()) {
+ Application application = new Application();
+ application.run(WeComSinkConnector.class);
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..013d5a8bb
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String robotWebhookKey;
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
new file mode 100644
index 000000000..8af43bdbe
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/config/WeComSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeComSinkConfig extends SinkConfig {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
new file mode 100644
index 000000000..43f1f1657
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+@Data
+@Accessors(chain = true)
+public class SendMessageRequest {
+
+ @JsonProperty("msgtype")
+ private String messageType;
+
+ @JsonProperty("text")
+ private Map<String, Object> textContent;
+
+ @JsonProperty("markdown")
+ private Map<String, Object> markdownContent;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
new file mode 100644
index 000000000..0a64f326d
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/SendMessageResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+
+@Data
+public class SendMessageResponse {
+
+ @JsonProperty("errcode")
+ private int errorCode;
+
+ @JsonProperty("errmsg")
+ private String errorMessage;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
new file mode 100644
index 000000000..bf884916d
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/java/org/apache/eventmesh/connector/wecom/sink/connector/WeComSinkConnector.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.sink.connector;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.enums.EventMeshDataContentType;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wecom.config.WeComMessageTemplateType;
+import
org.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys;
+import org.apache.eventmesh.connector.wecom.sink.config.WeComSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * WeCom sink connector.
+ * WeCom doc: <a
href="https://developer.work.weixin.qq.com/document/path/90236">...</a>
+ */
+@Slf4j
+public class WeComSinkConnector implements Sink {
+
+ private static final String ROBOT_WEBHOOK_URL_PREFIX =
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=";
+
+ private CloseableHttpClient httpClient;
+
+ private WeComSinkConfig sinkConfig;
+
+ private volatile boolean isRunning = false;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return WeComSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) {
+ this.sinkConfig = (WeComSinkConfig) config;
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
+ this.sinkConfig = (WeComSinkConfig)
sinkConnectorContext.getSinkConfig();
+ httpClient = HttpClientBuilder.create().build();
+ }
+
+ @Override
+ public void start() {
+ isRunning = true;
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ isRunning = false;
+ httpClient.close();
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ for (ConnectRecord record : sinkRecords) {
+ try {
+ if (Objects.isNull(record.getData())) {
+ log.warn("ConnectRecord data is null, ignore.");
+ continue;
+ }
+ sendMessage(record);
+ } catch (Exception e) {
+ log.error("Failed to sink message to WeCom.", e);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void sendMessage(ConnectRecord record) {
+ final String target = ROBOT_WEBHOOK_URL_PREFIX +
sinkConfig.getSinkConnectorConfig().getRobotWebhookKey();
+ SendMessageRequest request = new SendMessageRequest();
+ HttpPost httpPost = new HttpPost(target);
+ httpPost.addHeader("Content-Type",
EventMeshDataContentType.JSON.getCode());
+ WeComMessageTemplateType templateType = WeComMessageTemplateType.of(
+
Optional.ofNullable(record.getExtension(ConnectRecordExtensionKeys.WECOM_MESSAGE_TEMPLATE_TYPE_KEY))
+ .orElse(WeComMessageTemplateType.PLAIN_TEXT.getTemplateKey()));
+ Map<String, Object> contentMap = new HashMap<>();
+ if (WeComMessageTemplateType.PLAIN_TEXT == templateType) {
+ contentMap.put("content", new String((byte[]) record.getData()));
+ request.setTextContent(contentMap);
+ } else if (WeComMessageTemplateType.MARKDOWN == templateType) {
+ contentMap.put("content", new String((byte[]) record.getData()));
+ request.setMarkdownContent(contentMap);
+ }
+ request.setMessageType(templateType.getTemplateKey());
+ httpPost.setEntity(new
StringEntity(Objects.requireNonNull(JsonUtils.toJSONString(request)),
ContentType.APPLICATION_JSON));
+ CloseableHttpResponse httpResponse = httpClient.execute(httpPost);
+ String resultStr = EntityUtils.toString(httpResponse.getEntity(),
Constants.DEFAULT_CHARSET);
+ SendMessageResponse sendMessageResponse =
Objects.requireNonNull(JsonUtils.parseObject(resultStr,
SendMessageResponse.class));
+ if (sendMessageResponse.getErrorCode() != 0) {
+ throw new IllegalAccessException(String.format("Send message to
weCom error! errorCode=%s, errorMessage=%s",
+ sendMessageResponse.getErrorCode(),
sendMessageResponse.getErrorMessage()));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..c21d5db0d
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/main/resources/sink-config.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-WECOM
+ idc: FT
+ env: PRD
+ group: weComSink
+ appId: 5034
+ userName: weComSinkUser
+ passWord: weComPassWord
+sinkConnectorConfig:
+ connectorName: weComSink
+ robotWebhookKey: weComRobotWebhookKey
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
new file mode 100644
index 000000000..de363346b
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/java/org/apache/eventmesh/connector/wecom/connector/WeComSinkConnectorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wecom.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wecom.config.WeComMessageTemplateType;
+import
org.apache.eventmesh.connector.wecom.constants.ConnectRecordExtensionKeys;
+import org.apache.eventmesh.connector.wecom.sink.config.WeComSinkConfig;
+import org.apache.eventmesh.connector.wecom.sink.connector.SendMessageResponse;
+import org.apache.eventmesh.connector.wecom.sink.connector.WeComSinkConnector;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class WeComSinkConnectorTest {
+
+ private WeComSinkConnector connector;
+
+ @Mock
+ private CloseableHttpClient httpClient;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ connector = new WeComSinkConnector();
+ CloseableHttpResponse mockedResponse =
Mockito.mock(CloseableHttpResponse.class);
+ HttpEntity httpEntity = Mockito.mock(HttpEntity.class);
+
Mockito.doReturn(mockedResponse).when(httpClient).execute(any(HttpPost.class));
+ Mockito.doReturn(httpEntity).when(mockedResponse).getEntity();
+ WeComSinkConfig sinkConfig = (WeComSinkConfig)
ConfigUtil.parse(connector.configClass());
+ connector.init(sinkConfig);
+ Field httpClientField =
ReflectionSupport.findFields(connector.getClass(),
+ (f) -> f.getName().equals("httpClient"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ httpClientField.setAccessible(true);
+ httpClientField.set(connector, httpClient);
+ connector.start();
+ }
+
+ @Test
+ public void testSendMessageToWeCom() throws IOException {
+ try (MockedStatic<EntityUtils> entityUtilsMockedStatic =
Mockito.mockStatic(EntityUtils.class)) {
+ entityUtilsMockedStatic.when(() ->
EntityUtils.toString(any(HttpEntity.class), any(Charset.class)))
+ .thenReturn(JsonUtils.toJSONString(new SendMessageResponse()));
+ final int times = 3;
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition,
offset,
+ System.currentTimeMillis(), "Hello,
EventMesh!".getBytes(StandardCharsets.UTF_8));
+
connectRecord.addExtension(ConnectRecordExtensionKeys.WECOM_MESSAGE_TEMPLATE_TYPE_KEY,
+ WeComMessageTemplateType.PLAIN_TEXT.getTemplateKey());
+ records.add(connectRecord);
+ }
+ connector.put(records);
+ verify(httpClient, times(times)).execute(any(HttpPost.class));
+ }
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ connector.stop();
+ httpClient.close();
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..c21d5db0d
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wecom/src/test/resources/sink-config.yml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-WECOM
+ idc: FT
+ env: PRD
+ group: weComSink
+ appId: 5034
+ userName: weComSinkUser
+ passWord: weComPassWord
+sinkConnectorConfig:
+ connectorName: weComSink
+ robotWebhookKey: weComRobotWebhookKey
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 7766162e5..68a4f0537 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -95,7 +95,7 @@ public class ConnectRecord {
}
public String getExtension(String key) {
- if (this.extensions == null) {
+ if (this.extensions == null || !extensions.containsKey(key)) {
return null;
}
return this.extensions.getString(key);
diff --git a/settings.gradle b/settings.gradle
index 414b086ab..85877303e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -46,6 +46,7 @@ include 'eventmesh-connectors:eventmesh-connector-spring'
include 'eventmesh-connectors:eventmesh-connector-prometheus'
include 'eventmesh-connectors:eventmesh-connector-dingding'
include 'eventmesh-connectors:eventmesh-connector-feishu'
+include 'eventmesh-connectors:eventmesh-connector-wecom'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
@@ -95,4 +96,3 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
include 'eventmesh-retry'
include 'eventmesh-retry:eventmesh-retry-api'
include 'eventmesh-retry:eventmesh-retry-rocketmq'
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]