This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cad0e40cc5 [Fix][Connector-V2][GraphQL] Clarify subscription modes and
JSON-only response (#10303)
cad0e40cc5 is described below
commit cad0e40cc5060a2bfe3d1523c5a8012cadcce364
Author: corgy-w <[email protected]>
AuthorDate: Mon Jan 12 22:07:07 2026 +0800
[Fix][Connector-V2][GraphQL] Clarify subscription modes and JSON-only
response (#10303)
---
docs/en/connector-v2/sink/GraphQL.md | 9 ++-------
docs/en/connector-v2/source/GraphQL.md | 11 ++---------
docs/zh/connector-v2/sink/GraphQL.md | 9 ++-------
docs/zh/connector-v2/source/GraphQL.md | 13 +++----------
.../seatunnel/graphql/source/GraphQLSource.java | 18 +++++++++++++++++-
.../connectors/seatunnel/http/source/HttpSource.java | 2 +-
6 files changed, 27 insertions(+), 35 deletions(-)
diff --git a/docs/en/connector-v2/sink/GraphQL.md
b/docs/en/connector-v2/sink/GraphQL.md
index 2ac634083b..df9f1bbee3 100644
--- a/docs/en/connector-v2/sink/GraphQL.md
+++ b/docs/en/connector-v2/sink/GraphQL.md
@@ -26,12 +26,12 @@ Used to launch web hooks using data.
## Supported DataSource Info
-In order to use the Http connector, the following dependencies are required.
+In order to use the GraphQL connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central
repository.
| Datasource | Supported Versions |
Dependency |
|------------|--------------------|------------------------------------------------------------------------------------------------------------------|
-| Http | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus)
|
+| Http | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http)
|
## Sink Options
@@ -47,11 +47,6 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
-| key_timestamp | Int | NO | - | prometheus
timestamp key .
|
-| key_label | String | yes | - | prometheus label
key
|
-| key_value | Double | yes | - | prometheus value
|
-| batch_size | Int | false | 1024 | prometheus
batch size write
|
-| flush_interval | Long | false | 300000L | prometheus
flush commit interval |
| common-options | | No | - | Sink plugin
common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details |
## Example
diff --git a/docs/en/connector-v2/source/GraphQL.md
b/docs/en/connector-v2/source/GraphQL.md
index 30738ed1f0..2f062c8bf0 100644
--- a/docs/en/connector-v2/source/GraphQL.md
+++ b/docs/en/connector-v2/source/GraphQL.md
@@ -25,7 +25,6 @@ Used to read data from GraphQL.
| timeout | Long | No | - |
| content_field | String | Yes | $.data.{query_object}.* |
| schema.fields | Config | Yes | - |
-| format | String | No | json |
| params | Map | Yes | - |
| poll_interval_millis | int | No | - |
| retry | int | No | - |
@@ -56,8 +55,8 @@ variables = {
### enable_subscription [boolean]
-1. true : Build a socket reader to subscribe to the GraphQL service
-2. false : Build an http reader subscription to the GraphQL service
+1. true : Enable streaming subscription mode (WebSocket)
+2. false : Enable batch query mode (HTTP)
### timeout [Long]
@@ -87,10 +86,6 @@ The retry-backoff times(millis) multiplier if request http
failed
The maximum retry-backoff times(millis) if request http failed
-### format [String]
-
-the format of upstream data, default `json`.
-
### schema [Config]
Fill in a fixed value
@@ -122,7 +117,6 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
- format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
@@ -155,7 +149,6 @@ source {
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
- format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
diff --git a/docs/zh/connector-v2/sink/GraphQL.md
b/docs/zh/connector-v2/sink/GraphQL.md
index b73d9a6117..45210e7157 100644
--- a/docs/zh/connector-v2/sink/GraphQL.md
+++ b/docs/zh/connector-v2/sink/GraphQL.md
@@ -26,11 +26,11 @@ import ChangeLog from '../changelog/connector-graphql.md';
## 支持的数据源信息
-想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
+想使用 GraphQL 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
| 数据源 | 支持版本 | 依赖 |
| ------ | --------- |
------------------------------------------------------------ |
-| Http | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus)
|
+| Http | universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http)
|
## 接收器选项
@@ -46,11 +46,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
| retry_backoff_max_ms | Int | No | 10000 | The maximum
retry-backoff times(millis) if request http failed
|
| connect_timeout_ms | Int | No | 12000 | Connection
timeout setting, default 12s.
|
| socket_timeout_ms | Int | No | 60000 | Socket timeout
setting, default 60s.
|
-| key_timestamp | Int | NO | - | prometheus
timestamp key .
|
-| key_label | String | yes | - | prometheus label
key
|
-| key_value | Double | yes | - | prometheus value
|
-| batch_size | Int | false | 1024 | prometheus
batch size write
|
-| flush_interval | Long | false | 300000L | prometheus
flush commit interval |
| common-options | | No | - | Sink plugin
common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details |
## 示例
diff --git a/docs/zh/connector-v2/source/GraphQL.md
b/docs/zh/connector-v2/source/GraphQL.md
index ac578af642..c50a3ef06b 100644
--- a/docs/zh/connector-v2/source/GraphQL.md
+++ b/docs/zh/connector-v2/source/GraphQL.md
@@ -11,7 +11,7 @@ import ChangeLog from '../changelog/connector-graphql.md';
## 主要特性
- [x] [批处理](../../concept/connector-v2-features.md)
-- [ ] [流处理](../../concept/connector-v2-features.md)
+- [x] [流处理](../../concept/connector-v2-features.md)
- [ ] [并行](../../concept/connector-v2-features.md)
## 源选项
@@ -25,7 +25,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
| timeout | Long | No | - |
| content_field | String | Yes | $.data.{query_object}.* |
| schema.fields | Config | Yes | - |
-| format | String | No | json |
| params | Map | Yes | - |
| poll_interval_millis | int | No | - |
| retry | int | No | - |
@@ -56,8 +55,8 @@ variables = {
### enable_subscription [boolean]
-1. true : 构建一个套接字读取器来订阅GraphQL服务
-2. false : 构建GraphQL服务的http阅读器订阅
+1. true : 开启流式订阅模式(WebSocket)
+2. false : 开启批处理查询模式(HTTP)
### timeout [Long]
@@ -87,10 +86,6 @@ HTTP请求参数
如果http请求失败,最大重试回退时间(毫秒)
-### format [String]
-
-上游数据的格式,默认为json。
-
### schema [Config]
填写一个固定值
@@ -122,7 +117,6 @@ HTTP请求参数
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
- format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
@@ -155,7 +149,6 @@ source {
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
- format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
diff --git
a/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java
b/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java
index 9e36744bc2..0cceea3d20 100644
---
a/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java
+++
b/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java
@@ -17,8 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.graphql.source;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
@@ -26,7 +30,9 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
import
org.apache.seatunnel.connectors.seatunnel.graphql.config.GraphQLSourceParameter;
import
org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceHttpReader;
import
org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceSocketReader;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
@@ -49,7 +55,17 @@ public class GraphQLSource extends HttpSource {
@Override
protected void buildSchemaWithConfig(ReadonlyConfig pluginConfig) {
- super.buildSchemaWithConfig(pluginConfig);
+ if
(pluginConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
+ this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
+ this.deserializationSchema = new
JsonDeserializationSchema(catalogTable, false, false);
+ Config config = pluginConfig.toConfig();
+ if (config.hasPath(HttpSourceOptions.JSON_FIELD.key())) {
+ jsonField =
getJsonField(config.getConfig(HttpSourceOptions.JSON_FIELD.key()));
+ }
+ if (config.hasPath(HttpSourceOptions.CONTENT_FIELD.key())) {
+ contentField =
config.getString(HttpSourceOptions.CONTENT_FIELD.key());
+ }
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index c630bbe644..44629f4f75 100644
---
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -197,7 +197,7 @@ public class HttpSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
pageInfo);
}
- private JsonField getJsonField(Config jsonFieldConf) {
+ protected JsonField getJsonField(Config jsonFieldConf) {
ConfigRenderOptions options = ConfigRenderOptions.concise();
return JsonField.builder()
.fields(JsonUtils.toMap(jsonFieldConf.root().render(options)))