This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 38254e3f2 [Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink
Factory (#3325)
38254e3f2 is described below
commit 38254e3f26fe38eeecdedd166e56cf25a43c73f2
Author: Hisoka <[email protected]>
AuthorDate: Wed Nov 9 11:09:26 2022 +0800
[Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)
* [Connector-V2] [ElasticSearch] Add ElasticSearch Source/Sink Factory
* [Connector-V2] [ES] Fix doc error
* Update
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
Co-authored-by: Zongwen Li <[email protected]>
* Update
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
Co-authored-by: Zongwen Li <[email protected]>
Co-authored-by: Zongwen Li <[email protected]>
---
docs/en/connector-v2/sink/Elasticsearch.md | 22 +++++-----
docs/en/connector-v2/source/Elasticsearch.md | 10 ++++-
.../seatunnel/common/schema/Schema.java} | 14 +++++--
.../seatunnel/common/schema/SeaTunnelSchema.java | 5 ++-
.../elasticsearch/client/EsRestClient.java | 10 ++---
.../config/EsClusterConnectionConfig.java | 14 +++++--
.../seatunnel/elasticsearch/config/SinkConfig.java | 32 ++++++---------
.../elasticsearch/config/SourceConfig.java | 40 ++++++++++++++++++
.../elasticsearch/config/source/SourceConfig.java | 30 --------------
.../elasticsearch/constant/BulkConfig.java | 39 ------------------
.../seatunnel/elasticsearch/dto/IndexInfo.java | 6 +--
.../elasticsearch/sink/ElasticsearchSink.java | 17 ++++++--
.../sink/ElasticsearchSinkFactory.java | 46 +++++++++++++++++++++
.../sink/ElasticsearchSinkWriter.java | 26 +++++++-----
.../elasticsearch/source/ElasticsearchSource.java | 10 ++---
.../source/ElasticsearchSourceFactory.java | 47 ++++++++++++++++++++++
.../source/ElasticsearchSourceSplitEnumerator.java | 21 +++++-----
.../FakeDataGeneratorTest.java | 2 +-
.../file/hdfs/source/BaseHdfsFileSource.java | 2 +-
.../seatunnel/file/ftp/source/FtpFileSource.java | 4 +-
.../file/local/source/LocalFileSource.java | 4 +-
.../seatunnel/file/oss/source/OssFileSource.java | 2 +-
.../seatunnel/file/s3/source/S3FileSource.java | 4 +-
.../seatunnel/file/sftp/source/SftpFileSource.java | 4 +-
.../seatunnel/neo4j/source/Neo4jSource.java | 4 +-
.../seatunnel/pulsar/source/PulsarSource.java | 4 +-
.../seatunnel/redis/source/RedisSource.java | 4 +-
27 files changed, 259 insertions(+), 164 deletions(-)
diff --git a/docs/en/connector-v2/sink/Elasticsearch.md
b/docs/en/connector-v2/sink/Elasticsearch.md
index d8673b90d..d53dca003 100644
--- a/docs/en/connector-v2/sink/Elasticsearch.md
+++ b/docs/en/connector-v2/sink/Elasticsearch.md
@@ -19,16 +19,16 @@ Engine Supported
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| hosts | array | yes | - |
-| index | string | yes | - |
-| index_type | string | no | |
-| username | string | no | |
-| password | string | no | |
-| max_retry_size | int | no | 3 |
-| max_batch_size | int | no | 10 |
-| common-options | | no | - |
+| name | type | required | default value |
+|-----------------|--------|----------|---------------|
+| hosts | array | yes | - |
+| index | string | yes | - |
+| index_type | string | no | |
+| username | string | no | |
+| password | string | no | |
+| max_retry_count | int | no | 3 |
+| max_batch_size | int | no | 10 |
+| common-options | | no | - |
### hosts [array]
@@ -47,7 +47,7 @@ x-pack username
### password [string]
x-pack password
-### max_retry_size [int]
+### max_retry_count [int]
one bulk request max try size
### max_batch_size [int]
diff --git a/docs/en/connector-v2/source/Elasticsearch.md
b/docs/en/connector-v2/source/Elasticsearch.md
index f10f16eb0..8b3bcb644 100644
--- a/docs/en/connector-v2/source/Elasticsearch.md
+++ b/docs/en/connector-v2/source/Elasticsearch.md
@@ -20,14 +20,15 @@ support version >= 2.x and < 8.x.
## Options
| name | type | required | default value |
-|-------------|--------| -------- |---------------|
+|-------------|--------|----------|---------------|
| hosts | array | yes | - |
| username | string | no | - |
| password | string | no | - |
| index | string | yes | - |
-| source | array | yes | - |
+| source | array | no | - |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
+| schema | | no | - |
@@ -46,6 +47,7 @@ Elasticsearch index name, support * fuzzy matching.
### source [array]
The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other
index,you need specify an alias for _id due to the Elasticsearch limit.
+If you don't config source, you must config `schema`.
### scroll_time [String]
Amount of time Elasticsearch will keep the search context alive for scroll
requests.
@@ -53,6 +55,10 @@ Amount of time Elasticsearch will keep the search context
alive for scroll reque
### scroll_size [int]
Maximum number of hits to be returned with each Elasticsearch scroll request.
+### schema
+The structure of the data, including field names and field types.
+If you don't config schema, you must config `source`.
+
## Examples
simple
```hocon
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
similarity index 73%
rename from
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
rename to
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
index 035b556b6..b0614dc73 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfigDeaultConstant.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/Schema.java
@@ -15,12 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;
+package org.apache.seatunnel.connectors.seatunnel.common.schema;
-public class SourceConfigDeaultConstant {
+import org.apache.seatunnel.api.configuration.util.OptionMark;
- public static final String SCROLLL_TIME = "1m";
+import lombok.Data;
- public static final int SCROLLL_SIZE = 100;
+import java.util.Map;
+
+@Data
+public class Schema {
+
+ @OptionMark(description = "The schema fields map")
+ private Map<String, String> fields;
}
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
index 144a02c24..10b73c7c4 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.common.schema;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -42,7 +44,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
public class SeaTunnelSchema implements Serializable {
- public static final String SCHEMA = "schema";
+
+ public static final Option<Schema> SCHEMA =
Options.key("schema").objectType(Schema.class).noDefaultValue().withDescription("SeaTunnel
Schema");
private static final String FIELD_KEY = "fields";
private static final String SIMPLE_SCHEMA_FILED = "content";
private final SeaTunnelRowType seaTunnelRowType;
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 809654b24..e8c21f8f1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -71,13 +71,13 @@ public class EsRestClient {
}
public static EsRestClient createInstance(Config pluginConfig) {
- List<String> hosts =
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS);
+ List<String> hosts =
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
String username = null;
String password = null;
- if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) {
- username =
pluginConfig.getString(EsClusterConnectionConfig.USERNAME);
- if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) {
- password =
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD);
+ if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
+ username =
pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key());
+ if
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
+ password =
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key());
}
}
return createInstance(hosts, username, password);
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
index fd482db5e..5fee80878 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/EsClusterConnectionConfig.java
@@ -17,12 +17,20 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
public class EsClusterConnectionConfig {
- public static final String HOSTS = "hosts";
+ public static final Option<List<String>> HOSTS =
Options.key("hosts").listType().noDefaultValue()
+ .withDescription("Elasticsearch cluster http address, the format is
host:port, allowing multiple hosts to be specified. Such as [\"host1:9200\",
\"host2:9200\"]");
- public static final String USERNAME = "username";
+ public static final Option<String> USERNAME =
Options.key("username").stringType().noDefaultValue()
+ .withDescription("x-pack username");
- public static final String PASSWORD = "password";
+ public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue()
+ .withDescription("x-pack password");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
index 6dc753bb2..e819e618c 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java
@@ -17,31 +17,23 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
public class SinkConfig {
- public static final String INDEX = "index";
+ public static final Option<String> INDEX =
Options.key("index").stringType().noDefaultValue()
+ .withDescription("Elasticsearch index name.Index support contains
variables of field name,such as seatunnel_${age},and the field must appear at
seatunnel row. If not, we will treat it as a normal index");
- public static final String INDEX_TYPE = "index_type";
+ public static final Option<String> INDEX_TYPE =
Options.key("index_type").stringType().noDefaultValue()
+ .withDescription("Elasticsearch index type, it is recommended not to
specify in elasticsearch 6 and above");
- public static final String USERNAME = "username";
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> MAX_BATCH_SIZE =
Options.key("max_batch_size").intType().defaultValue(10)
+ .withDescription("batch bulk doc max size");
- public static final String PASSWORD = "password";
-
- public static final String HOSTS = "hosts";
-
- public static final String MAX_BATCH_SIZE = "max_batch_size";
-
- public static final String MAX_RETRY_SIZE = "max_retry_size";
-
- public static void
setValue(org.apache.seatunnel.shade.com.typesafe.config.Config pluginConfig) {
- if (pluginConfig.hasPath(MAX_BATCH_SIZE)) {
- BulkConfig.MAX_BATCH_SIZE = pluginConfig.getInt(MAX_BATCH_SIZE);
- }
- if (pluginConfig.hasPath(MAX_RETRY_SIZE)) {
- BulkConfig.MAX_RETRY_SIZE = pluginConfig.getInt(MAX_RETRY_SIZE);
- }
- }
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> MAX_RETRY_COUNT =
Options.key("max_retry_count").intType().defaultValue(3)
+ .withDescription("one bulk request max try count");
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
new file mode 100644
index 000000000..11a1f6576
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class SourceConfig {
+
+ public static final Option<String> INDEX =
Options.key("index").stringType().noDefaultValue()
+ .withDescription("Elasticsearch index name, support * fuzzy matching");
+
+ public static final Option<List<String>> SOURCE =
Options.key("source").listType().noDefaultValue()
+ .withDescription("The fields of index. You can get the document id by
specifying the field _id.If sink _id to other index,you need specify an alias
for _id due to the Elasticsearch limit");
+
+ public static final Option<String> SCROLL_TIME =
Options.key("scroll_time").stringType().defaultValue("1m")
+ .withDescription("Amount of time Elasticsearch will keep the search
context alive for scroll requests");
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static final Option<Integer> SCROLL_SIZE =
Options.key("scroll_size").intType().defaultValue(100)
+ .withDescription("Maximum number of hits to be returned with each
Elasticsearch scroll request");
+
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
deleted file mode 100644
index 804080307..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/source/SourceConfig.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;
-
-public class SourceConfig {
-
- public static final String INDEX = "index";
-
- public static final String SOURCE = "source";
-
- public static final String SCROLL_TIME = "scroll_time";
-
- public static final String SCROLL_SIZE = "scroll_size";
-
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
deleted file mode 100644
index dba8b8dd1..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/BulkConfig.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
-
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
-
-/**
- * bulk es config
- */
-public class BulkConfig {
- /**
- * once bulk es include max document size
- * {@link SinkConfig#MAX_BATCH_SIZE}
- */
- @SuppressWarnings("checkstyle:MagicNumber")
- public static int MAX_BATCH_SIZE = 10;
-
- /**
- * the max retry size of bulk es
- * {@link SinkConfig#MAX_RETRY_SIZE}
- */
- @SuppressWarnings("checkstyle:MagicNumber")
- public static int MAX_RETRY_SIZE = 3;
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
index dad62cf74..73d5da08b 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java
@@ -33,9 +33,9 @@ public class IndexInfo {
private String type;
public IndexInfo(Config pluginConfig) {
- index = pluginConfig.getString(SinkConfig.INDEX);
- if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE)) {
- type = pluginConfig.getString(SinkConfig.INDEX_TYPE);
+ index = pluginConfig.getString(SinkConfig.INDEX.key());
+ if (pluginConfig.hasPath(SinkConfig.INDEX_TYPE.key())) {
+ type = pluginConfig.getString(SinkConfig.INDEX_TYPE.key());
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index 3924c7199..48b7d2ed1 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -17,13 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
@@ -41,6 +43,10 @@ public class ElasticsearchSink implements
SeaTunnelSink<SeaTunnelRow, Elasticsea
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ private int maxBatchSize = MAX_BATCH_SIZE.defaultValue();
+
+ private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();
+
@Override
public String getPluginName() {
return "Elasticsearch";
@@ -49,7 +55,12 @@ public class ElasticsearchSink implements
SeaTunnelSink<SeaTunnelRow, Elasticsea
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
- SinkConfig.setValue(pluginConfig);
+ if (pluginConfig.hasPath(MAX_BATCH_SIZE.key())) {
+ maxBatchSize = pluginConfig.getInt(MAX_BATCH_SIZE.key());
+ }
+ if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
+ maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
+ }
}
@Override
@@ -64,7 +75,7 @@ public class ElasticsearchSink implements
SeaTunnelSink<SeaTunnelRow, Elasticsea
@Override
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo,
ElasticsearchSinkState> createWriter(SinkWriter.Context context) {
- return new ElasticsearchSinkWriter(context, seaTunnelRowType,
pluginConfig, Collections.emptyList());
+ return new ElasticsearchSinkWriter(context, seaTunnelRowType,
pluginConfig, maxBatchSize, maxRetryCount, Collections.emptyList());
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
new file mode 100644
index 000000000..5f5778e76
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ElasticsearchSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(HOSTS, INDEX)
+ .optional(INDEX_TYPE, USERNAME, PASSWORD, MAX_RETRY_COUNT,
MAX_BATCH_SIZE).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 1bc2ef55f..682afec58 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.BulkConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
@@ -40,39 +39,46 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-@Slf4j
/**
* ElasticsearchSinkWriter is a sink writer that will write {@link
SeaTunnelRow} to Elasticsearch.
*/
+@Slf4j
public class ElasticsearchSinkWriter implements SinkWriter<SeaTunnelRow,
ElasticsearchCommitInfo, ElasticsearchSinkState> {
private final SinkWriter.Context context;
+ private final int maxBatchSize;
+
+ private final int maxRetryCount;
+
private final SeaTunnelRowSerializer seaTunnelRowSerializer;
private final List<String> requestEsList;
private EsRestClient esRestClient;
public ElasticsearchSinkWriter(
- SinkWriter.Context context,
- SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig,
- List<ElasticsearchSinkState> elasticsearchStates) {
+ SinkWriter.Context context,
+ SeaTunnelRowType seaTunnelRowType,
+ Config pluginConfig,
+ int maxBatchSize, int maxRetryCount,
+ List<ElasticsearchSinkState> elasticsearchStates) {
this.context = context;
+ this.maxBatchSize = maxBatchSize;
+ this.maxRetryCount = maxRetryCount;
IndexInfo indexInfo = new IndexInfo(pluginConfig);
esRestClient = EsRestClient.createInstance(pluginConfig);
ElasticsearchVersion elasticsearchVersion =
ElasticsearchVersion.get(esRestClient.getClusterVersion());
this.seaTunnelRowSerializer = new
ElasticsearchRowSerializer(elasticsearchVersion, indexInfo, seaTunnelRowType);
- this.requestEsList = new ArrayList<>(BulkConfig.MAX_BATCH_SIZE);
+ this.requestEsList = new ArrayList<>(maxBatchSize);
}
@Override
public void write(SeaTunnelRow element) {
String indexRequestRow = seaTunnelRowSerializer.serializeRow(element);
requestEsList.add(indexRequestRow);
- if (requestEsList.size() >= BulkConfig.MAX_BATCH_SIZE) {
- bulkEsWithRetry(this.esRestClient, this.requestEsList,
BulkConfig.MAX_RETRY_SIZE);
+ if (requestEsList.size() >= maxBatchSize) {
+ bulkEsWithRetry(this.esRestClient, this.requestEsList,
maxRetryCount);
requestEsList.clear();
}
}
@@ -110,7 +116,7 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
@Override
public void close() throws IOException {
- bulkEsWithRetry(this.esRestClient, this.requestEsList,
BulkConfig.MAX_RETRY_SIZE);
+ bulkEsWithRetry(this.esRestClient, this.requestEsList, maxRetryCount);
esRestClient.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
index 674bc2da4..9339828c8 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.EsTypeMappingSeaTunnelType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -56,14 +56,14 @@ public class ElasticsearchSource implements
SeaTunnelSource<SeaTunnelRow, Elasti
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
rowTypeInfo =
SeaTunnelSchema.buildWithConfig(schemaConfig).getSeaTunnelRowType();
source = Arrays.asList(rowTypeInfo.getFieldNames());
} else {
- source = pluginConfig.getStringList(SourceConfig.SOURCE);
+ source = pluginConfig.getStringList(SourceConfig.SOURCE.key());
EsRestClient esRestClient =
EsRestClient.createInstance(this.pluginConfig);
- Map<String, String> esFieldType =
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX),
source);
+ Map<String, String> esFieldType =
esRestClient.getFieldTypeMapping(pluginConfig.getString(SourceConfig.INDEX.key()),
source);
esRestClient.close();
SeaTunnelDataType[] fieldTypes = new
SeaTunnelDataType[source.size()];
for (int i = 0; i < source.size(); i++) {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
new file mode 100644
index 000000000..1f00a5b21
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.INDEX;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SCROLL_TIME;
+import static
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig.SOURCE;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ElasticsearchSourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().required(HOSTS, INDEX).optional(USERNAME,
PASSWORD, SCROLL_TIME, SCROLL_SIZE)
+ .exclusive(SOURCE, SeaTunnelSchema.SCHEMA).build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index 2ba7c58ac..fafd749fd 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -19,8 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source.SourceConfigDeaultConstant;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
@@ -131,20 +130,20 @@ public class ElasticsearchSourceSplitEnumerator
implements SourceSplitEnumerator
private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
List<ElasticsearchSourceSplit> splits = new ArrayList<>();
- String scrolllTime = SourceConfigDeaultConstant.SCROLLL_TIME;
- if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME)) {
- scrolllTime = pluginConfig.getString(SourceConfig.SCROLL_TIME);
+ String scrollTime = SourceConfig.SCROLL_TIME.defaultValue();
+ if (pluginConfig.hasPath(SourceConfig.SCROLL_TIME.key())) {
+ scrollTime =
pluginConfig.getString(SourceConfig.SCROLL_TIME.key());
}
- int scrollSize = SourceConfigDeaultConstant.SCROLLL_SIZE;
- if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE)) {
- scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE);
+ int scrollSize = SourceConfig.SCROLL_SIZE.defaultValue();
+ if (pluginConfig.hasPath(SourceConfig.SCROLL_SIZE.key())) {
+ scrollSize = pluginConfig.getInt(SourceConfig.SCROLL_SIZE.key());
}
- List<IndexDocsCount> indexDocsCounts =
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX));
+ List<IndexDocsCount> indexDocsCounts =
esRestClient.getIndexDocsCount(pluginConfig.getString(SourceConfig.INDEX.key()));
indexDocsCounts = indexDocsCounts.stream().filter(x ->
x.getDocsCount() != null && x.getDocsCount() > 0)
-
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
+
.sorted(Comparator.comparingLong(IndexDocsCount::getDocsCount)).collect(Collectors.toList());
for (IndexDocsCount indexDocsCount : indexDocsCounts) {
- splits.add(new
ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()),
new SourceIndexInfo(indexDocsCount.getIndex(), source, scrolllTime,
scrollSize)));
+ splits.add(new
ElasticsearchSourceSplit(String.valueOf(indexDocsCount.getIndex().hashCode()),
new SourceIndexInfo(indexDocsCount.getIndex(), source, scrollTime,
scrollSize)));
}
return splits;
}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 15cf1592e..6e58e30ba 100644
---
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -43,7 +43,7 @@ public class FakeDataGeneratorTest {
@ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
public void testComplexSchemaParse(String conf) throws
FileNotFoundException, URISyntaxException {
Config testConfig = getTestConfigFile(conf);
- SeaTunnelSchema seaTunnelSchema =
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA));
+ SeaTunnelSchema seaTunnelSchema =
SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA.key()));
SeaTunnelRowType seaTunnelRowType =
seaTunnelSchema.getSeaTunnelRowType();
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
FakeDataGenerator fakeDataGenerator = new
FakeDataGenerator(seaTunnelSchema, fakeConfig);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 311a6dd04..161c3a921 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -59,7 +59,7 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index ab34a6c2b..abcb511d5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -68,12 +68,12 @@ public class FtpFileSource extends BaseFileSource {
}
// support user-defined schema
// only json type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index df04acde4..6cb4f9fa3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -65,12 +65,12 @@ public class LocalFileSource extends BaseFileSource {
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE).toUpperCase());
// only json text csv type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index a99d0e2ab..ec3926d4f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -71,7 +71,7 @@ public class OssFileSource extends BaseFileSource {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index d77c3d4e5..76d584c89 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -65,12 +65,12 @@ public class S3FileSource extends BaseFileSource {
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE).toUpperCase());
// only json text csv type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index 7bf20ebeb..39c27739d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -68,12 +68,12 @@ public class SftpFileSource extends BaseFileSource {
}
// support user-defined schema
// only json csv text type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
- Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ Config schemaConfig =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
.buildWithConfig(schemaConfig)
.getSeaTunnelRowType();
diff --git
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index dc52b91ad..26c9e524e 100644
---
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -66,13 +66,13 @@ public class Neo4jSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
public void prepare(Config pluginConfig) throws PrepareFailException {
neo4jSourceConfig.setDriverBuilder(prepareDriver(pluginConfig));
- final CheckResult configCheck =
CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, SeaTunnelSchema.SCHEMA);
+ final CheckResult configCheck =
CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY,
SeaTunnelSchema.SCHEMA.key());
if (!configCheck.isSuccess()) {
throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME,
PluginType.SOURCE, configCheck.getMsg());
}
neo4jSourceConfig.setQuery(pluginConfig.getString(KEY_QUERY));
- this.rowType =
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
+ this.rowType =
SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index d0c481569..a52d63b2b 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -152,7 +152,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit,
setPartitionDiscoverer(config);
setDeserialization(config);
- if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+ if (partitionDiscoverer instanceof TopicPatternDiscoverer
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
throw new IllegalArgumentException("Bounded streams do not support
dynamic partition discovery.");
@@ -226,7 +226,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit,
private void setDeserialization(Config config) {
String format = config.getString("format");
// TODO: format SPI
- SeaTunnelRowType rowType =
SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
+ SeaTunnelRowType rowType =
SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
deserialization = (DeserializationSchema<T>) new
JsonDeserializationSchema(false, false, rowType);
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 407d32222..2fe258d26 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -57,8 +57,8 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
this.redisParameters.buildWithConfig(pluginConfig);
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
- Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) {
+ Config schema =
pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key());
this.seaTunnelRowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
} else {
this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();