This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3aee11fc1 [Bug][Connector-V2] Fix wechat sink data serialization
(#2856)
3aee11fc1 is described below
commit 3aee11fc16c918213a81a8462ba2a65697d1aed8
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 26 20:41:11 2022 +0800
[Bug][Connector-V2] Fix wechat sink data serialization (#2856)
---
.../seatunnel/http/sink/HttpSinkWriter.java | 8 ++-
.../sink/WeChatBotMessageSerializationSchema.java | 71 ++++++++++++++++++++++
.../wechat/sink/WeChatHttpSinkWriter.java | 70 ---------------------
.../seatunnel/wechat/sink/WeChatSink.java | 25 ++------
4 files changed, 82 insertions(+), 92 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 4b90f8cb3..2def75ba7 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -40,10 +40,16 @@ public class HttpSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
protected final SerializationSchema serializationSchema;
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter
httpParameter) {
+ this(seaTunnelRowType, httpParameter, new
JsonSerializationSchema(seaTunnelRowType));
+ }
+
+ public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ SerializationSchema serializationSchema) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
this.httpClient = new HttpClientProvider(httpParameter);
- this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ this.serializationSchema = serializationSchema;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
new file mode 100644
index 000000000..13bb7ddce
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.SneakyThrows;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WeChatBotMessageSerializationSchema implements
SerializationSchema {
+ private final WeChatSinkConfig weChatSinkConfig;
+ private final SeaTunnelRowType rowType;
+ private final JsonSerializationSchema jsonSerializationSchema;
+
+ public WeChatBotMessageSerializationSchema(WeChatSinkConfig
weChatSinkConfig,
+ SeaTunnelRowType rowType) {
+ this.weChatSinkConfig = weChatSinkConfig;
+ this.rowType = rowType;
+ this.jsonSerializationSchema = new JsonSerializationSchema(rowType);
+ }
+
+ @SneakyThrows
+ @Override
+ public byte[] serialize(SeaTunnelRow row) {
+ StringBuffer stringBuffer = new StringBuffer();
+ int totalFields = rowType.getTotalFields();
+ for (int i = 0; i < totalFields; i++) {
+ stringBuffer.append(rowType.getFieldName(i) + ": " +
row.getField(i) + "\\n");
+ }
+ if (totalFields > 0) {
+ //remove last empty line
+ stringBuffer.delete(stringBuffer.length() - 2,
stringBuffer.length());
+ }
+
+ HashMap<Object, Object> content = new HashMap<>();
+ content.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY,
stringBuffer.toString());
+ if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
+ content.put(WeChatSinkConfig.MENTIONED_LIST,
weChatSinkConfig.getMentionedList());
+ }
+ if
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
+ content.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST,
weChatSinkConfig.getMentionedMobileList());
+ }
+
+ Map<String, Object> wechatMessage = new HashMap<>();
+ wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY,
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE);
+ wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE,
content);
+ return
jsonSerializationSchema.getMapper().writeValueAsBytes(wechatMessage);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
deleted file mode 100644
index 44ecab013..000000000
---
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
-import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-public class WeChatHttpSinkWriter extends HttpSinkWriter {
-
- private final WeChatSinkConfig weChatSinkConfig;
- private final SeaTunnelRowType seaTunnelRowType;
-
- public WeChatHttpSinkWriter(HttpParameter httpParameter, Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- //new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
- super(new SeaTunnelRowType(new
String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY,
WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new
SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
- this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
- @Override
- public void write(SeaTunnelRow element) throws IOException {
- StringBuffer stringBuffer = new StringBuffer();
- int totalFields = seaTunnelRowType.getTotalFields();
- for (int i = 0; i < totalFields; i++) {
- stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " +
element.getField(i) + "\\n");
- }
- if (totalFields > 0) {
- //remove last empty line
- stringBuffer.delete(stringBuffer.length() - 2,
stringBuffer.length());
- }
- HashMap<Object, Object> objectMap = new HashMap<>();
- objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY,
stringBuffer.toString());
- if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
- objectMap.put(WeChatSinkConfig.MENTIONED_LIST,
weChatSinkConfig.getMentionedList());
- }
- if
(!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
- objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST,
weChatSinkConfig.getMentionedMobileList());
- }
- //SeaTunnelRowWrapper can used to post wechat web hook
- SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new
Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
- super.write(wechatRowWrapper);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index 64b6d97b4..908fdcf10 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -17,15 +17,13 @@
package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
import com.google.auto.service.AutoService;
@@ -37,24 +35,9 @@ public class WeChatSink extends HttpSink {
return "WeChat";
}
- private Config pluginConfig;
-
- private SeaTunnelRowType seaTunnelRowType;
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- super.prepare(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- super.setTypeInfo(seaTunnelRowType);
- }
-
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
- return new WeChatHttpSinkWriter(super.httpParameter, pluginConfig,
seaTunnelRowType);
+ return new HttpSinkWriter(seaTunnelRowType, super.httpParameter,
+ new WeChatBotMessageSerializationSchema(new
WeChatSinkConfig(pluginConfig), seaTunnelRowType));
}
}