This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 793933b6b [Improve][Connector-V2] Http source support user-defined
schema (#2439)
793933b6b is described below
commit 793933b6b863678d91c2d8146726b5c81c3a22a5
Author: TyrantLucifer <[email protected]>
AuthorDate: Thu Aug 18 11:02:31 2022 +0800
[Improve][Connector-V2] Http source support user-defined schema (#2439)
* [Improve][Connector-V2] Http source support user-defined schema
---
docs/en/connector-v2/source/Http.md | 96 +++++++++++++++++-----
.../seatunnel/common/schema/SeatunnelSchema.java | 5 ++
.../seatunnel/http/config/HttpConfig.java | 3 +
.../seatunnel/http/source/HttpSource.java | 25 +++++-
.../seatunnel/http/source/HttpSourceReader.java | 13 ++-
5 files changed, 116 insertions(+), 26 deletions(-)
diff --git a/docs/en/connector-v2/source/Http.md
b/docs/en/connector-v2/source/Http.md
index 816e23aa8..507cf64f1 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -8,15 +8,19 @@ Used to read data from Http. Both support streaming and batch
mode.
## Options
-| name | type | required | default value |
-| --- |--------| --- | --- |
-| url | String | Yes | - |
-| method | String | No | GET |
-| headers | Map | No | - |
-| params | Map | No | - |
-| body | String | No | - |
+| name | type | required | default value |
+|---------------|--------|----------|---------------|
+| url | String | Yes | - |
+| schema | config | No | - |
+| schema.fields | config | No | - |
+| format | string | No | json |
+| method | String | No | get |
+| headers | Map | No | - |
+| params | Map | No | - |
+| body | String | No | - |
### url [string]
+
http request url
### method [string]
@@ -35,25 +39,77 @@ http params
http body
+### format [String]
+
+the format of upstream data, now only support `json` `text`, default `json`.
+
+when you assign format is `json`, you should also assign schema option, for
example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+when you assign format is `text`, connector will do nothing for upstream data,
for example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+connector will generate data as the following:
+
+| content |
+|---------|
+| {"code": 200, "data": "get success", "success": true} |
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data
+
## Example
simple:
```hocon
Http {
- url = "http://localhost/test/query"
- method = "GET"
- headers {
- token = "9e32e859ef044462a257e1fc76730066"
- }
- params {
- id = "1"
- type = "TEST"
- }
- body = "{
- \"code\": 5945141259552,
- \"name\": \"test\"
- }"
+ url = "https://tyrantlucifer.com/api/getDemoData"
+ schema {
+ fields {
+ code = int
+ message = string
+ data = string
+ ok = boolean
+ }
}
+}
```
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 8acad190c..57beaa8de 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -36,6 +36,7 @@ import java.util.Map;
public class SeatunnelSchema {
private static final String FIELD_KEY = "fields";
+ private static final String SIMPLE_SCHEMA_FILED = "content";
private final SeaTunnelRowType seaTunnelRowType;
private SeatunnelSchema(SeaTunnelRowType seaTunnelRowType) {
@@ -211,6 +212,10 @@ public class SeatunnelSchema {
return new SeatunnelSchema(seaTunnelRowType);
}
+ public static SeaTunnelRowType buildSimpleTextSchema() {
+ return new SeaTunnelRowType(new String[]{SIMPLE_SCHEMA_FILED}, new
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+ }
+
public SeaTunnelRowType getSeaTunnelRowType() {
return seaTunnelRowType;
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 29d622619..50ca7a00c 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -24,4 +24,7 @@ public class HttpConfig {
public static final String HEADERS = "headers";
public static final String PARAMS = "params";
public static final String BODY = "body";
+ public static final String SCHEMA = "schema";
+ public static final String FORMAT = "format";
+ public static final String DEFAULT_FORMAT = "json";
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 91d986a34..d65e59e75 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.http.source;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -29,11 +29,13 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -44,6 +46,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
protected final HttpParameter httpParameter = new HttpParameter();
protected SeaTunnelRowType rowType;
protected SeaTunnelContext seaTunnelContext;
+ protected DeserializationSchema<SeaTunnelRow> deserializationSchema;
@Override
public String getPluginName() {
@@ -62,8 +65,22 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
this.httpParameter.buildWithConfig(pluginConfig);
- // TODO support user custom row type
- this.rowType = new SeaTunnelRowType(new String[]{"content"}, new
SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+ if (pluginConfig.hasPath(HttpConfig.SCHEMA)) {
+ Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA);
+ this.rowType =
SeatunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ } else {
+ this.rowType = SeatunnelSchema.buildSimpleTextSchema();
+ }
+ // TODO: use format SPI
+ // default use json format
+ String format;
+ if (pluginConfig.hasPath(HttpConfig.FORMAT)) {
+ format = pluginConfig.getString(HttpConfig.FORMAT);
+ this.deserializationSchema = null;
+ } else {
+ format = HttpConfig.DEFAULT_FORMAT;
+ this.deserializationSchema = new JsonDeserializationSchema(false,
false, rowType);
+ }
}
@Override
@@ -78,6 +95,6 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
- return new HttpSourceReader(this.httpParameter, readerContext);
+ return new HttpSourceReader(this.httpParameter, readerContext,
this.deserializationSchema);
}
}
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 70d38fcca..5f7ff13df 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.source;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -37,10 +38,12 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
protected HttpClientProvider httpClient;
+ protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
- public HttpSourceReader(HttpParameter httpParameter,
SingleSplitReaderContext context) {
+ public HttpSourceReader(HttpParameter httpParameter,
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow>
deserializationSchema) {
this.context = context;
this.httpParameter = httpParameter;
+ this.deserializationSchema = deserializationSchema;
}
@Override
@@ -60,7 +63,13 @@ public class HttpSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
try {
HttpResponse response =
httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(),
this.httpParameter.getHeaders(), this.httpParameter.getParams());
if (HttpResponse.STATUS_OK == response.getCode()) {
- output.collect(new SeaTunnelRow(new Object[]
{response.getContent()}));
+ String content = response.getContent();
+ if (deserializationSchema != null) {
+ deserializationSchema.deserialize(content.getBytes(),
output);
+ } else {
+ // TODO: use seatunnel-text-format
+ output.collect(new SeaTunnelRow(new Object[]{content}));
+ }
return;
}
LOGGER.error("http client execute exception, http response status
code:[{}], content:[{}]", response.getCode(), response.getContent());