This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e4e8f7fbf [Improve][Connector-V2] Format SeaTunnelRow use
seatunnel-format-json (#2435)
e4e8f7fbf is described below
commit e4e8f7fbff51a1159ca5bd37be0666961af61d68
Author: TyrantLucifer <[email protected]>
AuthorDate: Wed Aug 17 09:38:47 2022 +0800
[Improve][Connector-V2] Format SeaTunnelRow use seatunnel-format-json
(#2435)
---
.../connector-http/connector-http-base/pom.xml | 6 ++++++
.../connectors/seatunnel/http/sink/HttpSinkWriter.java | 15 ++++++---------
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index 521cb2bfa..2a062f596 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -36,6 +36,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index de02bcd9d..e68a1c471 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -17,19 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Objects;
public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
@@ -37,21 +37,18 @@ public class HttpSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
protected final HttpClientProvider httpClient =
HttpClientProvider.getInstance();
protected final SeaTunnelRowType seaTunnelRowType;
protected final HttpParameter httpParameter;
+ protected final SerializationSchema serializationSchema;
public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter
httpParameter) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
+ this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- HashMap<Object, Object> objectMap = new HashMap<>();
- int totalFields = seaTunnelRowType.getTotalFields();
- for (int i = 0; i < totalFields; i++) {
- objectMap.put(seaTunnelRowType.getFieldName(i),
element.getField(i));
- }
- String body = objectMapper.writeValueAsString(objectMap);
+ byte[] serialize = serializationSchema.serialize(element);
+ String body = new String(serialize);
try {
// only support post web hook
HttpResponse response = httpClient.doPost(httpParameter.getUrl(),
httpParameter.getHeaders(), body);