This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e200e0a3  [Feature][Connector-V2]  Add Enterprise Wechat sink 
connector (#2412)
3e200e0a3 is described below

commit 3e200e0a38b1358c025b7c5e08572eccfd10fbb1
Author: 531651225 <[email protected]>
AuthorDate: Wed Aug 17 10:35:46 2022 +0800

     [Feature][Connector-V2]  Add Enterprise Wechat sink connector (#2412)
    
    * [Feature][Connector-V2] Add Enterprise WeChart source output format
---
 docs/en/connector-v2/sink/Enterprise-WeChat.md     | 53 ++++++++++++++++
 seatunnel-connectors-v2-dist/pom.xml               |  5 ++
 .../{ => connector-http-wechat}/pom.xml            | 16 ++---
 .../wechat/sink/WeChatHttpSinkWriter.java          | 70 ++++++++++++++++++++++
 .../seatunnel/wechat/sink/WeChatSink.java          | 60 +++++++++++++++++++
 .../wechat/sink/config/WeChatSinkConfig.java       | 45 ++++++++++++++
 seatunnel-connectors-v2/connector-http/pom.xml     |  1 +
 7 files changed, 243 insertions(+), 7 deletions(-)

diff --git a/docs/en/connector-v2/sink/Enterprise-WeChat.md 
b/docs/en/connector-v2/sink/Enterprise-WeChat.md
new file mode 100644
index 000000000..303648212
--- /dev/null
+++ b/docs/en/connector-v2/sink/Enterprise-WeChat.md
@@ -0,0 +1,53 @@
+# Enterprise WeChat
+
+> Enterprise WeChat sink connector
+
+## Description
+
+A sink plugin which use Enterprise WeChat robot send message
+> For example, if the data from upstream is [`"alarmStatus": "firing", 
"alarmTime": "2022-08-03 01:38:49","alarmContent": "The disk usage exceeds the 
threshold"`], the output content to WeChat Robot is the following:
+> ```
+> alarmStatus: firing 
+> alarmTime: 2022-08-03 01:38:49
+> alarmContent: The disk usage exceeds the threshold
+> ```
+**Tips: WeChat sink only support `string` webhook and the data from source 
will be treated as body content in web hook.**
+
+
+##  Options
+
+| name | type   | required | default value |
+| --- |--------|----------| --- |
+| url | String | Yes      | - |
+| mentioned_list | array | No       | - |
+| mentioned_mobile_list | array | No       | - |
+
+### url [string]
+
+Enterprise WeChat webhook url format is 
https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXX(string)
+
+### mentioned_list [array]
+
+A list of userids to remind the specified members in the group (@ a member), @ 
all means to remind everyone. If the developer can't get the userid, he can use 
called_ mobile_ list
+
+### mentioned_mobile_list [array]
+
+Mobile phone number list, remind the group member corresponding to the mobile 
phone number (@ a member), @ all means remind everyone
+
+## Example
+
+simple:
+
+```hocon
+WeChat {
+        url = 
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693axxx6-7aoc-4bc4-97a0-0ec2sifa5aaa";
+    }
+```
+
+```hocon
+WeChat {
+        url = 
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=693axxx6-7aoc-4bc4-97a0-0ec2sifa5aaa";
+        mentioned_list=["wangqing","@all"]
+        mentioned_mobile_list=["13800001111","@all"]
+    }
+```
diff --git a/seatunnel-connectors-v2-dist/pom.xml 
b/seatunnel-connectors-v2-dist/pom.xml
index 00f4af6d5..0f50bba0e 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>connector-http-feishu</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-wechat</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-jdbc</artifactId>
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
similarity index 77%
copy from seatunnel-connectors-v2/connector-http/pom.xml
copy to seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
index e8fbcadc1..991eea99f 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/pom.xml
@@ -21,17 +21,19 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>seatunnel-connectors-v2</artifactId>
+        <artifactId>connector-http</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <artifactId>connector-http</artifactId>
-    <packaging>pom</packaging>
 
-    <modules>
-        <module>connector-http-base</module>
-        <module>connector-http-feishu</module>
-    </modules>
+    <artifactId>connector-http-wechat</artifactId>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-http-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
new file mode 100644
index 000000000..44ecab013
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class WeChatHttpSinkWriter extends HttpSinkWriter {
+
+    private final WeChatSinkConfig weChatSinkConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public WeChatHttpSinkWriter(HttpParameter httpParameter,  Config 
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        //new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
+        super(new SeaTunnelRowType(new 
String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, 
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new 
SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
+        this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        StringBuffer stringBuffer = new StringBuffer();
+        int totalFields = seaTunnelRowType.getTotalFields();
+        for (int i = 0; i < totalFields; i++) {
+            stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " + 
element.getField(i) + "\\n");
+        }
+        if (totalFields > 0) {
+            //remove last empty line
+            stringBuffer.delete(stringBuffer.length() - 2, 
stringBuffer.length());
+        }
+        HashMap<Object, Object> objectMap = new HashMap<>();
+        objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, 
stringBuffer.toString());
+        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
+            objectMap.put(WeChatSinkConfig.MENTIONED_LIST, 
weChatSinkConfig.getMentionedList());
+        }
+        if 
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
+            objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, 
weChatSinkConfig.getMentionedMobileList());
+        }
+        //SeaTunnelRowWrapper can used to post wechat web hook
+        SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new 
Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
+        super.write(wechatRowWrapper);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
new file mode 100644
index 000000000..64b6d97b4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class WeChatSink extends HttpSink {
+
+    @Override
+    public String getPluginName() {
+        return "WeChat";
+    }
+
+    private Config pluginConfig;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        super.prepare(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        super.setTypeInfo(seaTunnelRowType);
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+        return new WeChatHttpSinkWriter(super.httpParameter, pluginConfig, 
seaTunnelRowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
new file mode 100644
index 000000000..be2baab60
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/config/WeChatSinkConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+
+import java.util.List;
+
+@Data
+public class WeChatSinkConfig {
+    public static final String WECHAT_SEND_MSG_SUPPORT_TYPE = "text";
+    public static final String WECHAT_SEND_MSG_TYPE_KEY = "msgtype";
+    public static final String WECHAT_SEND_MSG_CONTENT_KEY = "content";
+    public static final String MENTIONED_LIST = "mentioned_list";
+    public static final String MENTIONED_MOBILE_LIST = "mentioned_mobile_list";
+    private List<String> mentionedList;
+    private List<String> mentionedMobileList;
+
+    public WeChatSinkConfig(@NonNull Config pluginConfig){
+        if (pluginConfig.hasPath(MENTIONED_LIST)) {
+            this.mentionedList = pluginConfig.getStringList(MENTIONED_LIST);
+        }
+        if (pluginConfig.hasPath(MENTIONED_MOBILE_LIST)) {
+            this.mentionedMobileList = 
pluginConfig.getStringList(MENTIONED_MOBILE_LIST);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml 
b/seatunnel-connectors-v2/connector-http/pom.xml
index e8fbcadc1..5b699bef0 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/pom.xml
@@ -32,6 +32,7 @@
     <modules>
         <module>connector-http-base</module>
         <module>connector-http-feishu</module>
+        <module>connector-http-wechat</module>
     </modules>
 
 </project>
\ No newline at end of file

Reply via email to