This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new d463fa713 [ISSUE #4410]add wechat sink connector (#4594)
d463fa713 is described below
commit d463fa71310fad03798941d03c8065829c14e38b
Author: wizardzhang <[email protected]>
AuthorDate: Sun Dec 3 17:42:46 2023 +0800
[ISSUE #4410]add wechat sink connector (#4594)
* add wechat connector module and dependency
* add wechat connector
* add unit test
* fix check style error and downgrade okhttp version to 3.14.9
* add stop method after test end
* fix check style
---
build.gradle | 2 +
.../eventmesh-connector-wechat/build.gradle | 42 +++++
.../wechat/config/WeChatConnectServerConfig.java | 30 +++
.../wechat/server/WeChatConnectServer.java | 38 ++++
.../wechat/sink/config/SinkConnectorConfig.java | 30 +++
.../wechat/sink/config/WeChatSinkConfig.java | 30 +++
.../sink/connector/TemplateMessageResponse.java | 30 +++
.../wechat/sink/connector/WeChatSinkConnector.java | 202 +++++++++++++++++++++
.../src/main/resources/server-config.yml | 18 ++
.../src/main/resources/sink-config.yml | 30 +++
.../sink/connector/WeChatSinkConnectorTest.java | 130 +++++++++++++
.../src/test/resources/server-config.yml | 18 ++
.../src/test/resources/sink-config.yml | 30 +++
settings.gradle | 1 +
14 files changed, 631 insertions(+)
diff --git a/build.gradle b/build.gradle
index 798e5ad5f..2c11cb437 100644
--- a/build.gradle
+++ b/build.gradle
@@ -499,6 +499,8 @@ subprojects {
dependency
"com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.0"
dependency
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.0"
+ dependency "com.squareup.okhttp3:okhttp:3.14.9"
+
dependency "org.asynchttpclient:async-http-client:2.12.0"
dependency "org.apache.httpcomponents:httpclient:4.5.13"
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/build.gradle
b/eventmesh-connectors/eventmesh-connector-wechat/build.gradle
new file mode 100644
index 000000000..c79152448
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-wechat/build.gradle
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+ implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+ implementation.exclude group: 'log4j', module: 'log4j'
+ testImplementation.exclude group: 'org.apache.logging.log4j', module:
'log4j-to-slf4j'
+}
+
+dependencies {
+ implementation project(":eventmesh-common")
+ implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+ implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+
+ implementation 'com.alibaba:fastjson'
+ implementation 'com.google.guava:guava'
+ implementation 'com.squareup.okhttp3:okhttp'
+
+ implementation "io.netty:netty-all"
+
+
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+
+ testImplementation "org.mockito:mockito-core"
+ testImplementation "org.mockito:mockito-junit-jupiter"
+ testImplementation "org.mockito:mockito-inline"
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
new file mode 100644
index 000000000..a2634b6b4
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeChatConnectServerConfig extends Config {
+
+ private boolean sinkEnable;
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
new file mode 100644
index 000000000..0ea1927c1
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.wechat.config.WeChatConnectServerConfig;
+import
org.apache.eventmesh.connector.wechat.sink.connector.WeChatSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class WeChatConnectServer {
+
+ public static void main(String[] args) throws Exception {
+
+ WeChatConnectServerConfig weChatConnectServerConfig =
ConfigUtil.parse(WeChatConnectServerConfig.class,
+ Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+ if (weChatConnectServerConfig.isSinkEnable()) {
+ Application application = new Application();
+ application.run(WeChatSinkConnector.class);
+ }
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..e575e65c6
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+ private String connectorName;
+
+ private String appId;
+
+ private String appSecret;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
new file mode 100644
index 000000000..a77e1c77b
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeChatSinkConfig extends SinkConfig {
+
+ private SinkConnectorConfig sinkConnectorConfig;
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
new file mode 100644
index 000000000..3d51704e0
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import lombok.Data;
+
+@Data
+public class TemplateMessageResponse {
+
+ private int errocode;
+
+ private String errmsg;
+
+ private String msgid;
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
new file mode 100644
index 000000000..0457202b6
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wechat.sink.config.WeChatSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+/**
+ * WeChat sink connector. WeChat doc: <a
href="https://developer.work.weixin.qq.com/document/path/90236">...</a>
+ */
+@Slf4j
+public class WeChatSinkConnector implements Sink {
+
+ public static final Cache<String, String> ACCESS_TOKEN_CACHE =
CacheBuilder.newBuilder()
+ .initialCapacity(12)
+ .maximumSize(10)
+ .concurrencyLevel(5)
+ .expireAfterWrite(120, TimeUnit.MINUTES)
+ .build();
+
+ public static final String ACCESS_TOKEN_CACHE_KEY = "access_token";
+
+ private static final String ACCESS_TOKEN_URL =
"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s";
+
+ private static final String MESSAGE_SEND_URL =
"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=%s";
+
+ private OkHttpClient okHttpClient;
+
+ private WeChatSinkConfig sinkConfig;
+
+ private volatile boolean isRunning = false;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return WeChatSinkConfig.class;
+ }
+
+ @Override
+ public void init(Config config) {
+ this.sinkConfig = (WeChatSinkConfig) config;
+ okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(60, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS)
+ .writeTimeout(60, TimeUnit.SECONDS)
+ .retryOnConnectionFailure(true)
+ .build();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ SinkConnectorContext sinkConnectorContext = (SinkConnectorContext)
connectorContext;
+ this.sinkConfig = (WeChatSinkConfig)
sinkConnectorContext.getSinkConfig();
+ okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(60, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS)
+ .writeTimeout(60, TimeUnit.SECONDS)
+ .retryOnConnectionFailure(true)
+ .build();
+ }
+
+ @Override
+ public void start() {
+ isRunning = true;
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) {
+ for (ConnectRecord record : sinkRecords) {
+ try {
+ if (Objects.isNull(record.getData())) {
+ log.warn("ConnectRecord data is null, ignore.");
+ continue;
+ }
+ sendMessage(record);
+ } catch (Exception e) {
+ log.error("Failed to sink message to WeChat.", e);
+ }
+ }
+ }
+
+ @SneakyThrows
+ private void sendMessage(ConnectRecord record) {
+ // get access token
+ String accessToken = getAccessToken();
+ MediaType mediaType = MediaType.parse("application/json;
charset=utf-8");
+ RequestBody body = RequestBody.create(mediaType, new String((byte[])
record.getData()));
+ Request request = new Request.Builder()
+ .url(String.format(MESSAGE_SEND_URL, accessToken))
+ .post(body)
+ .build();
+ try (Response response = okHttpClient.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ log.error("server response: {}",
ToStringBuilder.reflectionToString(response));
+ throw new IOException("Unexpected code " + response);
+ }
+
+ ResponseBody responseBody = response.body();
+ if (responseBody == null) {
+ throw new IOException("Response body is null.");
+ }
+
+ String jsonStr = responseBody.string();
+ TemplateMessageResponse messageResponse =
JsonUtils.parseObject(jsonStr, TemplateMessageResponse.class);
+ if (messageResponse == null) {
+ throw new IOException("message response is null.");
+ }
+
+ if (messageResponse.getErrocode() != 0) {
+ throw new IllegalAccessException(String.format("Send message
to weCom error! errorCode=%s, errorMessage=%s",
+ messageResponse.getErrocode(),
messageResponse.getErrmsg()));
+ }
+ }
+
+ }
+
+ @SneakyThrows
+ private String getAccessToken() {
+ return ACCESS_TOKEN_CACHE.get(ACCESS_TOKEN_CACHE_KEY,
+ () -> {
+ Request tokenRequest = new Request.Builder()
+ .url(String.format(ACCESS_TOKEN_URL,
sinkConfig.getSinkConnectorConfig().getAppId(),
+ sinkConfig.getSinkConnectorConfig().getAppSecret()))
+ .get()
+ .build();
+ String accessToken;
+ try (Response response =
okHttpClient.newCall(tokenRequest).execute()) {
+ if (!response.isSuccessful()) {
+ log.error("server response: {}",
ToStringBuilder.reflectionToString(response));
+ throw new IOException("Unexpected code " + response);
+ }
+
+ String json =
Objects.requireNonNull(response.body()).string();
+ JSONObject jsonObject = JSON.parseObject(json);
+ accessToken =
Objects.requireNonNull(jsonObject).getString(ACCESS_TOKEN_CACHE_KEY);
+ ACCESS_TOKEN_CACHE.put(ACCESS_TOKEN_CACHE_KEY,
accessToken);
+ }
+
+ return accessToken;
+ });
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..6f8face36
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-WECHAT
+ idc: FT
+ env: PRD
+ group: weChatSink
+ appId: 5034
+ userName: weChatSinkUser
+ passWord: weChatPassWord
+sinkConnectorConfig:
+ connectorName: weChatSink
+ appId: weChatAppId
+ appSecret: weChatAppSecret
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
new file mode 100644
index 000000000..2c4ad5b01
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.wechat.sink.config.WeChatSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+
+
+@ExtendWith(MockitoExtension.class)
+public class WeChatSinkConnectorTest {
+
+ private WeChatSinkConnector weChatSinkConnector;
+
+ @Mock
+ private OkHttpClient okHttpClient;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ Request tokenRequest = new
Request.Builder().url("https://api.weixin.qq.com/cgi-bin/token").build();
+ String tokenResponseJson =
"{\"access_token\":\"ACCESS_TOKEN\",\"expires_in\":7200}";
+ ResponseBody responseBody =
ResponseBody.create(MediaType.parse("application/json; charset=utf-8"),
tokenResponseJson);
+ Response tokenResponse = new Response.Builder()
+ .request(tokenRequest)
+ .protocol(Protocol.HTTP_1_0)
+ .message("ok")
+ .code(200)
+ .body(responseBody)
+ .build();
+ ArgumentMatcher<Request> tokenMatcher = (anyRequest) ->
tokenRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
+ Call tokenCall = Mockito.mock(Call.class);
+
Mockito.doReturn(tokenCall).when(okHttpClient).newCall(Mockito.argThat(tokenMatcher));
+ Mockito.doReturn(tokenResponse).when(tokenCall).execute();
+
+ Request sendMessageRequest = new
Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send").build();
+ String sendMessageResponseJson =
"{\"errcode\":0,\"errmsg\":\"ok\",\"msgid\":200228332}";
+ ResponseBody sendMessageBody =
ResponseBody.create(MediaType.parse("application/json; charset=utf-8"),
sendMessageResponseJson);
+ Response sendMessageResponse = new Response.Builder()
+ .code(200)
+ .protocol(Protocol.HTTP_1_0)
+ .request(sendMessageRequest)
+ .body(sendMessageBody)
+ .message("ok")
+ .build();
+ ArgumentMatcher<Request> sendMessageMatcher = (anyRequest) ->
+
sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
+ Call sendMessageRequestCall = Mockito.mock(Call.class);
+
Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
+
Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
+
+ weChatSinkConnector = new WeChatSinkConnector();
+ WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig)
ConfigUtil.parse(weChatSinkConnector.configClass());
+ weChatSinkConnector.init(weChatSinkConfig);
+ Field clientField =
ReflectionSupport.findFields(weChatSinkConnector.getClass(),
+ (f) -> f.getName().equals("okHttpClient"),
+ HierarchyTraversalMode.BOTTOM_UP).get(0);
+ clientField.setAccessible(true);
+ clientField.set(weChatSinkConnector, okHttpClient);
+ weChatSinkConnector.start();
+ }
+
+ @Test
+ public void testSendMessageToWeChat() throws Exception {
+ final int times = 1;
+ List<ConnectRecord> records = new ArrayList<>();
+ for (int i = 0; i < times; i++) {
+ RecordPartition partition = new RecordPartition();
+ RecordOffset offset = new RecordOffset();
+ ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+ System.currentTimeMillis(), "Hello,
EventMesh!".getBytes(StandardCharsets.UTF_8));
+ records.add(connectRecord);
+ }
+
+ weChatSinkConnector.put(records);
+ verify(okHttpClient, times(times + 1)).newCall(any(Request.class));
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ weChatSinkConnector.stop();
+ }
+
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..6f8face36
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TEST-TOPIC-WECHAT
+ idc: FT
+ env: PRD
+ group: weChatSink
+ appId: 5034
+ userName: weChatSinkUser
+ passWord: weChatPassWord
+sinkConnectorConfig:
+ connectorName: weChatSink
+ appId: weChatAppId
+ appSecret: weChatAppSecret
diff --git a/settings.gradle b/settings.gradle
index b95ff0ddc..a60f8d862 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -48,6 +48,7 @@ include 'eventmesh-connectors:eventmesh-connector-dingtalk'
include 'eventmesh-connectors:eventmesh-connector-feishu'
include 'eventmesh-connectors:eventmesh-connector-wecom'
include 'eventmesh-connectors:eventmesh-connector-slack'
+include 'eventmesh-connectors:eventmesh-connector-wechat'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]