This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new d463fa713 [ISSUE #4410]add wechat sink connector (#4594)
d463fa713 is described below

commit d463fa71310fad03798941d03c8065829c14e38b
Author: wizardzhang <[email protected]>
AuthorDate: Sun Dec 3 17:42:46 2023 +0800

    [ISSUE #4410]add wechat sink connector (#4594)
    
    * add wechat connector module and dependency
    
    * add wechat connector
    
    * add unit test
    
    * fix check style error and downgrade okhttp version to 3.14.9
    
    * add stop method after test end
    
    * fix check style
---
 build.gradle                                       |   2 +
 .../eventmesh-connector-wechat/build.gradle        |  42 +++++
 .../wechat/config/WeChatConnectServerConfig.java   |  30 +++
 .../wechat/server/WeChatConnectServer.java         |  38 ++++
 .../wechat/sink/config/SinkConnectorConfig.java    |  30 +++
 .../wechat/sink/config/WeChatSinkConfig.java       |  30 +++
 .../sink/connector/TemplateMessageResponse.java    |  30 +++
 .../wechat/sink/connector/WeChatSinkConnector.java | 202 +++++++++++++++++++++
 .../src/main/resources/server-config.yml           |  18 ++
 .../src/main/resources/sink-config.yml             |  30 +++
 .../sink/connector/WeChatSinkConnectorTest.java    | 130 +++++++++++++
 .../src/test/resources/server-config.yml           |  18 ++
 .../src/test/resources/sink-config.yml             |  30 +++
 settings.gradle                                    |   1 +
 14 files changed, 631 insertions(+)

diff --git a/build.gradle b/build.gradle
index 798e5ad5f..2c11cb437 100644
--- a/build.gradle
+++ b/build.gradle
@@ -499,6 +499,8 @@ subprojects {
             dependency 
"com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.0"
             dependency 
"com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.0"
 
+            dependency "com.squareup.okhttp3:okhttp:3.14.9"
+
             dependency "org.asynchttpclient:async-http-client:2.12.0"
             dependency "org.apache.httpcomponents:httpclient:4.5.13"
 
diff --git a/eventmesh-connectors/eventmesh-connector-wechat/build.gradle 
b/eventmesh-connectors/eventmesh-connector-wechat/build.gradle
new file mode 100644
index 000000000..c79152448
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-wechat/build.gradle
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+configurations {
+    implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
+    implementation.exclude group: 'log4j', module: 'log4j'
+    testImplementation.exclude group: 'org.apache.logging.log4j', module: 
'log4j-to-slf4j'
+}
+
+dependencies {
+    implementation project(":eventmesh-common")
+    implementation project(":eventmesh-sdks:eventmesh-sdk-java")
+    implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
+
+    implementation 'com.alibaba:fastjson'
+    implementation 'com.google.guava:guava'
+    implementation 'com.squareup.okhttp3:okhttp'
+
+    implementation "io.netty:netty-all"
+
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+
+    testImplementation "org.mockito:mockito-core"
+    testImplementation "org.mockito:mockito-junit-jupiter"
+    testImplementation "org.mockito:mockito-inline"
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
new file mode 100644
index 000000000..a2634b6b4
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/config/WeChatConnectServerConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeChatConnectServerConfig extends Config {
+
+    private boolean sinkEnable;
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
new file mode 100644
index 000000000..0ea1927c1
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/server/WeChatConnectServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.server;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.connector.wechat.config.WeChatConnectServerConfig;
+import 
org.apache.eventmesh.connector.wechat.sink.connector.WeChatSinkConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class WeChatConnectServer {
+
+    public static void main(String[] args) throws Exception {
+
+        WeChatConnectServerConfig weChatConnectServerConfig = 
ConfigUtil.parse(WeChatConnectServerConfig.class,
+            Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
+
+        if (weChatConnectServerConfig.isSinkEnable()) {
+            Application application = new Application();
+            application.run(WeChatSinkConnector.class);
+        }
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
new file mode 100644
index 000000000..e575e65c6
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/SinkConnectorConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.config;
+
+import lombok.Data;
+
+@Data
+public class SinkConnectorConfig {
+
+    private String connectorName;
+
+    private String appId;
+
+    private String appSecret;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
new file mode 100644
index 000000000..a77e1c77b
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/config/WeChatSinkConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.config;
+
+import org.apache.eventmesh.openconnect.api.config.SinkConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class WeChatSinkConfig extends SinkConfig {
+
+    private SinkConnectorConfig sinkConnectorConfig;
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
new file mode 100644
index 000000000..3d51704e0
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/TemplateMessageResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import lombok.Data;
+
+@Data
+public class TemplateMessageResponse {
+
+    private int errocode;
+
+    private String errmsg;
+
+    private String msgid;
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
new file mode 100644
index 000000000..0457202b6
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnector.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.wechat.sink.config.WeChatSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+/**
+ * WeChat sink connector. WeChat doc: <a 
href="https://developer.work.weixin.qq.com/document/path/90236";>...</a>
+ */
+@Slf4j
+public class WeChatSinkConnector implements Sink {
+
+    public static final Cache<String, String> ACCESS_TOKEN_CACHE = 
CacheBuilder.newBuilder()
+        .initialCapacity(12)
+        .maximumSize(10)
+        .concurrencyLevel(5)
+        .expireAfterWrite(120, TimeUnit.MINUTES)
+        .build();
+
+    public static final String ACCESS_TOKEN_CACHE_KEY = "access_token";
+
+    private static final String ACCESS_TOKEN_URL = 
"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s";;
+
+    private static final String MESSAGE_SEND_URL = 
"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=%s";;
+
+    private OkHttpClient okHttpClient;
+
+    private WeChatSinkConfig sinkConfig;
+
+    private volatile boolean isRunning = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return WeChatSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.sinkConfig = (WeChatSinkConfig) config;
+        okHttpClient = new OkHttpClient.Builder()
+            .connectTimeout(60, TimeUnit.SECONDS)
+            .readTimeout(60, TimeUnit.SECONDS)
+            .writeTimeout(60, TimeUnit.SECONDS)
+            .retryOnConnectionFailure(true)
+            .build();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.sinkConfig = (WeChatSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        okHttpClient = new OkHttpClient.Builder()
+            .connectTimeout(60, TimeUnit.SECONDS)
+            .readTimeout(60, TimeUnit.SECONDS)
+            .writeTimeout(60, TimeUnit.SECONDS)
+            .retryOnConnectionFailure(true)
+            .build();
+    }
+
+    @Override
+    public void start() {
+        isRunning = true;
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        isRunning = false;
+    }
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) {
+        for (ConnectRecord record : sinkRecords) {
+            try {
+                if (Objects.isNull(record.getData())) {
+                    log.warn("ConnectRecord data is null, ignore.");
+                    continue;
+                }
+                sendMessage(record);
+            } catch (Exception e) {
+                log.error("Failed to sink message to WeChat.", e);
+            }
+        }
+    }
+
+    @SneakyThrows
+    private void sendMessage(ConnectRecord record) {
+        // get access token
+        String accessToken = getAccessToken();
+        MediaType mediaType = MediaType.parse("application/json; 
charset=utf-8");
+        RequestBody body = RequestBody.create(mediaType, new String((byte[]) 
record.getData()));
+        Request request = new Request.Builder()
+            .url(String.format(MESSAGE_SEND_URL, accessToken))
+            .post(body)
+            .build();
+        try (Response response = okHttpClient.newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                log.error("server response: {}", 
ToStringBuilder.reflectionToString(response));
+                throw new IOException("Unexpected code " + response);
+            }
+
+            ResponseBody responseBody = response.body();
+            if (responseBody == null) {
+                throw new IOException("Response body is null.");
+            }
+
+            String jsonStr = responseBody.string();
+            TemplateMessageResponse messageResponse = 
JsonUtils.parseObject(jsonStr, TemplateMessageResponse.class);
+            if (messageResponse == null) {
+                throw new IOException("message response is null.");
+            }
+
+            if (messageResponse.getErrocode() != 0) {
+                throw new IllegalAccessException(String.format("Send message 
to weCom error! errorCode=%s, errorMessage=%s",
+                    messageResponse.getErrocode(), 
messageResponse.getErrmsg()));
+            }
+        }
+
+    }
+
+    @SneakyThrows
+    private String getAccessToken() {
+        return ACCESS_TOKEN_CACHE.get(ACCESS_TOKEN_CACHE_KEY,
+            () -> {
+                Request tokenRequest = new Request.Builder()
+                    .url(String.format(ACCESS_TOKEN_URL, 
sinkConfig.getSinkConnectorConfig().getAppId(),
+                        sinkConfig.getSinkConnectorConfig().getAppSecret()))
+                    .get()
+                    .build();
+                String accessToken;
+                try (Response response = 
okHttpClient.newCall(tokenRequest).execute()) {
+                    if (!response.isSuccessful()) {
+                        log.error("server response: {}", 
ToStringBuilder.reflectionToString(response));
+                        throw new IOException("Unexpected code " + response);
+                    }
+
+                    String json = 
Objects.requireNonNull(response.body()).string();
+                    JSONObject jsonObject = JSON.parseObject(json);
+                    accessToken = 
Objects.requireNonNull(jsonObject).getString(ACCESS_TOKEN_CACHE_KEY);
+                    ACCESS_TOKEN_CACHE.put(ACCESS_TOKEN_CACHE_KEY, 
accessToken);
+                }
+
+                return accessToken;
+            });
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
new file mode 100644
index 000000000..6f8face36
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/main/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-WECHAT
+    idc: FT
+    env: PRD
+    group: weChatSink
+    appId: 5034
+    userName: weChatSinkUser
+    passWord: weChatPassWord
+sinkConnectorConfig:
+    connectorName: weChatSink
+    appId: weChatAppId
+    appSecret: weChatAppSecret
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
new file mode 100644
index 000000000..2c4ad5b01
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.wechat.sink.connector;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.eventmesh.connector.wechat.sink.config.WeChatSinkConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+import org.junit.platform.commons.support.ReflectionSupport;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import okhttp3.Call;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+
+
+@ExtendWith(MockitoExtension.class)
+public class WeChatSinkConnectorTest {
+
+    private WeChatSinkConnector weChatSinkConnector;
+
+    @Mock
+    private OkHttpClient okHttpClient;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        Request tokenRequest = new 
Request.Builder().url("https://api.weixin.qq.com/cgi-bin/token";).build();
+        String tokenResponseJson = 
"{\"access_token\":\"ACCESS_TOKEN\",\"expires_in\":7200}";
+        ResponseBody responseBody = 
ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), 
tokenResponseJson);
+        Response tokenResponse = new Response.Builder()
+            .request(tokenRequest)
+            .protocol(Protocol.HTTP_1_0)
+            .message("ok")
+            .code(200)
+            .body(responseBody)
+            .build();
+        ArgumentMatcher<Request> tokenMatcher = (anyRequest) -> 
tokenRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
+        Call tokenCall = Mockito.mock(Call.class);
+        
Mockito.doReturn(tokenCall).when(okHttpClient).newCall(Mockito.argThat(tokenMatcher));
+        Mockito.doReturn(tokenResponse).when(tokenCall).execute();
+
+        Request sendMessageRequest = new 
Request.Builder().url("https://api.weixin.qq.com/cgi-bin/message/template/send";).build();
+        String sendMessageResponseJson = 
"{\"errcode\":0,\"errmsg\":\"ok\",\"msgid\":200228332}";
+        ResponseBody sendMessageBody = 
ResponseBody.create(MediaType.parse("application/json; charset=utf-8"), 
sendMessageResponseJson);
+        Response sendMessageResponse = new Response.Builder()
+            .code(200)
+            .protocol(Protocol.HTTP_1_0)
+            .request(sendMessageRequest)
+            .body(sendMessageBody)
+            .message("ok")
+            .build();
+        ArgumentMatcher<Request> sendMessageMatcher = (anyRequest) ->
+            
sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
+        Call sendMessageRequestCall = Mockito.mock(Call.class);
+        
Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
+        
Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
+
+        weChatSinkConnector = new WeChatSinkConnector();
+        WeChatSinkConfig weChatSinkConfig = (WeChatSinkConfig) 
ConfigUtil.parse(weChatSinkConnector.configClass());
+        weChatSinkConnector.init(weChatSinkConfig);
+        Field clientField = 
ReflectionSupport.findFields(weChatSinkConnector.getClass(),
+            (f) -> f.getName().equals("okHttpClient"),
+            HierarchyTraversalMode.BOTTOM_UP).get(0);
+        clientField.setAccessible(true);
+        clientField.set(weChatSinkConnector, okHttpClient);
+        weChatSinkConnector.start();
+    }
+
+    @Test
+    public void testSendMessageToWeChat() throws Exception {
+        final int times = 1;
+        List<ConnectRecord> records = new ArrayList<>();
+        for (int i = 0; i < times; i++) {
+            RecordPartition partition = new RecordPartition();
+            RecordOffset offset = new RecordOffset();
+            ConnectRecord connectRecord = new ConnectRecord(partition, offset,
+                System.currentTimeMillis(), "Hello, 
EventMesh!".getBytes(StandardCharsets.UTF_8));
+            records.add(connectRecord);
+        }
+
+        weChatSinkConnector.put(records);
+        verify(okHttpClient, times(times + 1)).newCall(any(Request.class));
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        weChatSinkConnector.stop();
+    }
+
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
new file mode 100644
index 000000000..20d6e6d59
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/server-config.yml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sinkEnable: true
diff --git 
a/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
new file mode 100644
index 000000000..6f8face36
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-wechat/src/test/resources/sink-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TEST-TOPIC-WECHAT
+    idc: FT
+    env: PRD
+    group: weChatSink
+    appId: 5034
+    userName: weChatSinkUser
+    passWord: weChatPassWord
+sinkConnectorConfig:
+    connectorName: weChatSink
+    appId: weChatAppId
+    appSecret: weChatAppSecret
diff --git a/settings.gradle b/settings.gradle
index b95ff0ddc..a60f8d862 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -48,6 +48,7 @@ include 'eventmesh-connectors:eventmesh-connector-dingtalk'
 include 'eventmesh-connectors:eventmesh-connector-feishu'
 include 'eventmesh-connectors:eventmesh-connector-wecom'
 include 'eventmesh-connectors:eventmesh-connector-slack'
+include 'eventmesh-connectors:eventmesh-connector-wechat'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to