This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3aee11fc1 [Bug][Connector-V2] Fix wechat sink data serialization 
(#2856)
3aee11fc1 is described below

commit 3aee11fc16c918213a81a8462ba2a65697d1aed8
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 26 20:41:11 2022 +0800

    [Bug][Connector-V2] Fix wechat sink data serialization (#2856)
---
 .../seatunnel/http/sink/HttpSinkWriter.java        |  8 ++-
 .../sink/WeChatBotMessageSerializationSchema.java  | 71 ++++++++++++++++++++++
 .../wechat/sink/WeChatHttpSinkWriter.java          | 70 ---------------------
 .../seatunnel/wechat/sink/WeChatSink.java          | 25 ++------
 4 files changed, 82 insertions(+), 92 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 4b90f8cb3..2def75ba7 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -40,10 +40,16 @@ public class HttpSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     protected final SerializationSchema serializationSchema;
 
     public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter 
httpParameter) {
+        this(seaTunnelRowType, httpParameter, new 
JsonSerializationSchema(seaTunnelRowType));
+    }
+
+    public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType,
+                          HttpParameter httpParameter,
+                          SerializationSchema serializationSchema) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
         this.httpClient = new HttpClientProvider(httpParameter);
-        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+        this.serializationSchema = serializationSchema;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
new file mode 100644
index 000000000..13bb7ddce
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.SneakyThrows;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WeChatBotMessageSerializationSchema implements 
SerializationSchema {
+    private final WeChatSinkConfig weChatSinkConfig;
+    private final SeaTunnelRowType rowType;
+    private final JsonSerializationSchema jsonSerializationSchema;
+
+    public WeChatBotMessageSerializationSchema(WeChatSinkConfig 
weChatSinkConfig,
+                                               SeaTunnelRowType rowType) {
+        this.weChatSinkConfig = weChatSinkConfig;
+        this.rowType = rowType;
+        this.jsonSerializationSchema = new JsonSerializationSchema(rowType);
+    }
+
+    @SneakyThrows
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        StringBuffer stringBuffer = new StringBuffer();
+        int totalFields = rowType.getTotalFields();
+        for (int i = 0; i < totalFields; i++) {
+            stringBuffer.append(rowType.getFieldName(i) + ": " + 
row.getField(i) + "\\n");
+        }
+        if (totalFields > 0) {
+            //remove last empty line
+            stringBuffer.delete(stringBuffer.length() - 2, 
stringBuffer.length());
+        }
+
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, 
stringBuffer.toString());
+        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
+            content.put(WeChatSinkConfig.MENTIONED_LIST, 
weChatSinkConfig.getMentionedList());
+        }
+        if 
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
+            content.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, 
weChatSinkConfig.getMentionedMobileList());
+        }
+
+        Map<String, Object> wechatMessage = new HashMap<>();
+        wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, 
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE);
+        wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, 
content);
+        return 
jsonSerializationSchema.getMapper().writeValueAsBytes(wechatMessage);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
deleted file mode 100644
index 44ecab013..000000000
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
-import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-public class WeChatHttpSinkWriter extends HttpSinkWriter {
-
-    private final WeChatSinkConfig weChatSinkConfig;
-    private final SeaTunnelRowType seaTunnelRowType;
-
-    public WeChatHttpSinkWriter(HttpParameter httpParameter,  Config 
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        //new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
-        super(new SeaTunnelRowType(new 
String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, 
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new 
SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
-        this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    @Override
-    public void write(SeaTunnelRow element) throws IOException {
-        StringBuffer stringBuffer = new StringBuffer();
-        int totalFields = seaTunnelRowType.getTotalFields();
-        for (int i = 0; i < totalFields; i++) {
-            stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " + 
element.getField(i) + "\\n");
-        }
-        if (totalFields > 0) {
-            //remove last empty line
-            stringBuffer.delete(stringBuffer.length() - 2, 
stringBuffer.length());
-        }
-        HashMap<Object, Object> objectMap = new HashMap<>();
-        objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, 
stringBuffer.toString());
-        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
-            objectMap.put(WeChatSinkConfig.MENTIONED_LIST, 
weChatSinkConfig.getMentionedList());
-        }
-        if 
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
-            objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, 
weChatSinkConfig.getMentionedMobileList());
-        }
-        //SeaTunnelRowWrapper can used to post wechat web hook
-        SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new 
Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
-        super.write(wechatRowWrapper);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index 64b6d97b4..908fdcf10 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -17,15 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
 
 import com.google.auto.service.AutoService;
 
@@ -37,24 +35,9 @@ public class WeChatSink extends HttpSink {
         return "WeChat";
     }
 
-    private Config pluginConfig;
-
-    private SeaTunnelRowType seaTunnelRowType;
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        super.prepare(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-        super.setTypeInfo(seaTunnelRowType);
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
-        return new WeChatHttpSinkWriter(super.httpParameter, pluginConfig, 
seaTunnelRowType);
+        return new HttpSinkWriter(seaTunnelRowType, super.httpParameter,
+            new WeChatBotMessageSerializationSchema(new 
WeChatSinkConfig(pluginConfig), seaTunnelRowType));
     }
 }

Reply via email to