This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 793933b6b [Improve][Connector-V2] Http source support user-defined 
schema (#2439)
793933b6b is described below

commit 793933b6b863678d91c2d8146726b5c81c3a22a5
Author: TyrantLucifer <[email protected]>
AuthorDate: Thu Aug 18 11:02:31 2022 +0800

    [Improve][Connector-V2] Http source support user-defined schema (#2439)
    
    * [Improve][Connector-V2] Http source support user-defined schema
---
 docs/en/connector-v2/source/Http.md                | 96 +++++++++++++++++-----
 .../seatunnel/common/schema/SeatunnelSchema.java   |  5 ++
 .../seatunnel/http/config/HttpConfig.java          |  3 +
 .../seatunnel/http/source/HttpSource.java          | 25 +++++-
 .../seatunnel/http/source/HttpSourceReader.java    | 13 ++-
 5 files changed, 116 insertions(+), 26 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index 816e23aa8..507cf64f1 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -8,15 +8,19 @@ Used to read data from Http. Both support streaming and batch 
mode.
 
 ##  Options
 
-| name | type   | required | default value |
-| --- |--------| --- | --- |
-| url | String | Yes | - |
-| method | String | No | GET |
-| headers | Map    | No | - |
-| params | Map | No | - |
-| body | String | No | - |
+| name          | type   | required | default value |
+|---------------|--------|----------|---------------|
+| url           | String | Yes      | -             |
+| schema        | config | No       | -             |
+| schema.fields | config | No       | -             |
+| format        | string | No       | json          |
+| method        | String | No       | get           |
+| headers       | Map    | No       | -             |
+| params        | Map    | No       | -             |
+| body          | String | No       | -             |
 
 ### url [string]
+
 http request url
 
 ### method [string]
@@ -35,25 +39,77 @@ http params
 
 http body
 
+### format [String]
+
+the format of upstream data, now only support `json` `text`, default `json`.
+
+when you assign format is `json`, you should also assign schema option, for 
example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+when you assign format is `text`, connector will do nothing for upstream data, 
for example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+connector will generate data as the following:
+
+| content |
+|---------|
+| {"code":  200, "data":  "get success", "success":  true}        |
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data
+
 ## Example
 
 simple:
 
 ```hocon
 Http {
-        url = "http://localhost/test/query";
-        method = "GET"
-        headers {
-            token = "9e32e859ef044462a257e1fc76730066"
-        }
-        params {
-            id = "1"
-            type = "TEST"
-        }
-        body = "{
-            \"code\": 5945141259552,
-            \"name\": \"test\"
-        }"
+    url = "https://tyrantlucifer.com/api/getDemoData";
+    schema {
+      fields {
+        code = int
+        message = string
+        data = string
+        ok = boolean
+      }
     }
+}
 ```
 
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 8acad190c..57beaa8de 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -36,6 +36,7 @@ import java.util.Map;
 
 public class SeatunnelSchema {
     private static final String FIELD_KEY = "fields";
+    private static final String SIMPLE_SCHEMA_FILED = "content";
     private final SeaTunnelRowType seaTunnelRowType;
 
     private SeatunnelSchema(SeaTunnelRowType seaTunnelRowType) {
@@ -211,6 +212,10 @@ public class SeatunnelSchema {
         return new SeatunnelSchema(seaTunnelRowType);
     }
 
+    public static SeaTunnelRowType buildSimpleTextSchema() {
+        return new SeaTunnelRowType(new String[]{SIMPLE_SCHEMA_FILED}, new 
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+    }
+
     public SeaTunnelRowType getSeaTunnelRowType() {
         return seaTunnelRowType;
     }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 29d622619..50ca7a00c 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -24,4 +24,7 @@ public class HttpConfig {
     public static final String HEADERS = "headers";
     public static final String PARAMS = "params";
     public static final String BODY = "body";
+    public static final String SCHEMA = "schema";
+    public static final String FORMAT = "format";
+    public static final String DEFAULT_FORMAT = "json";
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 91d986a34..d65e59e75 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.http.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -29,11 +29,13 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -44,6 +46,7 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     protected final HttpParameter httpParameter = new HttpParameter();
     protected SeaTunnelRowType rowType;
     protected SeaTunnelContext seaTunnelContext;
+    protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
     @Override
     public String getPluginName() {
@@ -62,8 +65,22 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
         this.httpParameter.buildWithConfig(pluginConfig);
-        // TODO support user custom row type
-        this.rowType = new SeaTunnelRowType(new String[]{"content"}, new 
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
+            Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
+            this.rowType = 
SeatunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+        } else {
+            this.rowType = SeatunnelSchema.buildSimpleTextSchema();
+        }
+        // TODO: use format SPI
+        // default use json format
+        String format;
+        if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
+            format = pluginConfig.getString(HttpConfig.FORMAT);
+            this.deserializationSchema = null;
+        } else {
+            format = HttpConfig.DEFAULT_FORMAT;
+            this.deserializationSchema = new JsonDeserializationSchema(false, 
false, rowType);
+        }
     }
 
     @Override
@@ -78,6 +95,6 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new HttpSourceReader(this.httpParameter, readerContext);
+        return new HttpSourceReader(this.httpParameter, readerContext, 
this.deserializationSchema);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 70d38fcca..5f7ff13df 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.source;
 
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -37,10 +38,12 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     protected final SingleSplitReaderContext context;
     protected final HttpParameter httpParameter;
     protected HttpClientProvider httpClient;
+    protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
-    public HttpSourceReader(HttpParameter httpParameter, 
SingleSplitReaderContext context) {
+    public HttpSourceReader(HttpParameter httpParameter, 
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> 
deserializationSchema) {
         this.context = context;
         this.httpParameter = httpParameter;
+        this.deserializationSchema = deserializationSchema;
     }
 
     @Override
@@ -60,7 +63,13 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
         try {
             HttpResponse response = 
httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), 
this.httpParameter.getHeaders(), this.httpParameter.getParams());
             if (HttpResponse.STATUS_OK == response.getCode()) {
-                output.collect(new SeaTunnelRow(new Object[] 
{response.getContent()}));
+                String content = response.getContent();
+                if (deserializationSchema != null) {
+                    deserializationSchema.deserialize(content.getBytes(), 
output);
+                } else {
+                    // TODO: use seatunnel-text-format
+                    output.collect(new SeaTunnelRow(new Object[]{content}));
+                }
                 return;
             }
             LOGGER.error("http client execute exception, http response status 
code:[{}], content:[{}]", response.getCode(), response.getContent());

Reply via email to