This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a5447528c [Imporve][Fake-Connector-V2]support user-defined-schmea and
random data for fake-table (#2406)
a5447528c is described below
commit a5447528c34e9c0b5452ef4f175dd32d3636c0e6
Author: Laglangyue <[email protected]>
AuthorDate: Tue Aug 23 09:45:05 2022 +0800
[Imporve][Fake-Connector-V2]support user-defined-schmea and random data for
fake-table (#2406)
* [Connector-V2][JDBC-connector] optimization fake
Co-authored-by: tangjiafu <[email protected]>
---
docs/en/connector-v2/sink/Assert.md | 12 +-
docs/en/connector-v2/source/FakeSource.md | 76 ++++++++++++
.../seatunnel/common/schema/SeatunnelSchema.java | 3 +-
.../seatunnel/fake/source/FakeRandomData.java | 133 +++++++++++++++++++++
.../seatunnel/fake/source/FakeSource.java | 12 +-
.../seatunnel/fake/source/FakeSourceReader.java | 16 +--
.../FakeRandomDataTest.java | 87 ++++++++++++++
.../src/test/resources/complex.schema.conf | 56 ++++-----
.../src/test/resources/simple.schema.conf | 55 ++++-----
.../resources/assertion/fakesource_to_assert.conf | 20 ++--
.../test/resources/fake/fakesource_to_console.conf | 18 +--
.../resources/file/fakesource_to_hdfs_json.conf | 11 +-
.../resources/file/fakesource_to_hdfs_parquet.conf | 11 +-
.../resources/file/fakesource_to_hdfs_text.conf | 11 +-
.../resources/file/fakesource_to_local_json.conf | 11 +-
.../file/fakesource_to_local_parquet.conf | 11 +-
.../resources/file/fakesource_to_local_text.conf | 13 +-
.../test/resources/iotdb/fakesource_to_iotdb.conf | 9 +-
.../test/resources/jdbc/fakesource_to_jdbc.conf | 12 +-
.../test/resources/jdbc/jdbcsource_to_console.conf | 4 +-
.../test/resources/fake/fakesource_to_console.conf | 10 +-
.../resources/file/fakesource_to_hdfs_json.conf | 13 +-
.../resources/file/fakesource_to_hdfs_parquet.conf | 13 +-
.../resources/file/fakesource_to_hdfs_text.conf | 13 +-
.../resources/file/fakesource_to_local_json.conf | 11 +-
.../file/fakesource_to_local_parquet.conf | 11 +-
.../resources/file/fakesource_to_local_text.conf | 11 +-
.../test/resources/iotdb/fakesource_to_iotdb.conf | 7 +-
.../main/resources/examples/fake_to_console.conf | 13 +-
.../main/resources/examples/fake_to_dingtalk.conf | 13 +-
.../resources/examples/fakesource_to_file.conf | 13 +-
.../resources/examples/spark.batch.clickhouse.conf | 9 +-
.../src/main/resources/examples/spark.batch.conf | 29 ++++-
.../translation/serialization/RowConverter.java | 4 +-
34 files changed, 570 insertions(+), 181 deletions(-)
diff --git a/docs/en/connector-v2/sink/Assert.md
b/docs/en/connector-v2/sink/Assert.md
index ca9332d0c..9e5c49acf 100644
--- a/docs/en/connector-v2/sink/Assert.md
+++ b/docs/en/connector-v2/sink/Assert.md
@@ -37,13 +37,11 @@ A list value rule define the data value validation
### rule_type [string]
The following rules are supported for now
-`
-NOT_NULL, // value can't be null
-MIN, // define the minimum value of data
-MAX, // define the maximum value of data
-MIN_LENGTH, // define the minimum string length of a string data
-MAX_LENGTH // define the maximum string length of a string data
-`
+- NOT_NULL `value can't be null`
+- MIN `define the minimum value of data`
+- MAX `define the maximum value of data`
+- MIN_LENGTH `define the minimum string length of a string data`
+- MAX_LENGTH `define the maximum string length of a string data`
### rule_value [double]
diff --git a/docs/en/connector-v2/source/FakeSource.md
b/docs/en/connector-v2/source/FakeSource.md
new file mode 100644
index 000000000..9c4bf4ffd
--- /dev/null
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -0,0 +1,76 @@
+# FakeSource
+
+> FakeSource connector
+
+## Description
+
+The FakeSource is a virtual data source, which randomly generates the number
of rows according to the data structure of the user-defined schema,
+just for testing, such as type conversion and feature testing
+
+## Options
+
+| name | type | required | default value |
+|-------------------|--------|----------|---------------|
+| result_table_name | string | yes | - |
+| schema | config | yes | - |
+
+### result_table_name [string]
+
+The table name.
+
+### type [string]
+Table structure description ,you should assign schema option to tell connector
how to parse data to the row you want.
+**Tips**: Most of Unstructured-Datasource contain this param, such as
LocalFile,HdfsFile.
+**Example**:
+```hocon
+schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ }
+ }
+```
+
+## Example
+Simple source for FakeSource which contains enough datatype
+```hocon
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+```
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 4c4799b21..835d1d0ca 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -32,9 +32,10 @@ import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import java.io.Serializable;
import java.util.Map;
-public class SeatunnelSchema {
+public class SeatunnelSchema implements Serializable {
public static final String SCHEMA = "schema";
private static final String FIELD_KEY = "fields";
private static final String SIMPLE_SCHEMA_FILED = "content";
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
new file mode 100644
index 000000000..efafde5ca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class FakeRandomData {
+ public static final String SCHEMA = "schema";
+ private final SeatunnelSchema schema;
+
+ public FakeRandomData(SeatunnelSchema schema) {
+ this.schema = schema;
+ }
+
+ public SeaTunnelRow randomRow() {
+ SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ List<Object> randomRow = new ArrayList<>(fieldNames.length);
+ for (SeaTunnelDataType<?> fieldType : fieldTypes) {
+ randomRow.add(randomColumnValue(fieldType));
+ }
+ return new SeaTunnelRow(randomRow.toArray());
+ }
+
+ @SuppressWarnings("magicnumber")
+ private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
+ if (BOOLEAN_TYPE.equals(fieldType)) {
+ return RandomUtils.nextInt(0, 2) == 1;
+ } else if (BYTE_TYPE.equals(fieldType)) {
+ return (byte) RandomUtils.nextInt(0, 255);
+ } else if (SHORT_TYPE.equals(fieldType)) {
+ return (short) RandomUtils.nextInt(Byte.MAX_VALUE,
Short.MAX_VALUE);
+ } else if (INT_TYPE.equals(fieldType)) {
+ return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
+ } else if (LONG_TYPE.equals(fieldType)) {
+ return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
+ } else if (FLOAT_TYPE.equals(fieldType)) {
+ return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
+ } else if (DOUBLE_TYPE.equals(fieldType)) {
+ return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
+ } else if (STRING_TYPE.equals(fieldType)) {
+ return RandomStringUtils.randomAlphabetic(10);
+ } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
+ return randomLocalDateTime().toLocalDate();
+ } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
+ return randomLocalDateTime().toLocalTime();
+ } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
+ return randomLocalDateTime();
+ } else if (fieldType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) fieldType;
+ return new
BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() -
decimalType.getScale()) + "." +
+ RandomStringUtils.randomNumeric(decimalType.getScale()));
+ } else if (fieldType instanceof ArrayType) {
+ ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
+ BasicType<?> elementType = arrayType.getElementType();
+ Object value = randomColumnValue(elementType);
+ Object arr = Array.newInstance(elementType.getTypeClass(), 1);
+ Array.set(arr, 0, value);
+ return arr;
+ } else if (fieldType instanceof MapType) {
+ MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
+ SeaTunnelDataType<?> keyType = mapType.getKeyType();
+ Object key = randomColumnValue(keyType);
+ SeaTunnelDataType<?> valueType = mapType.getValueType();
+ Object value = randomColumnValue(valueType);
+ HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
+ objectObjectHashMap.put(key, value);
+ return objectObjectHashMap;
+ } else if (fieldType instanceof PrimitiveByteArrayType) {
+ return RandomUtils.nextBytes(100);
+ } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
+ return Void.TYPE;
+ } else {
+ throw new UnsupportedOperationException("Unexpected value: " +
fieldType);
+ }
+ }
+
+ @SuppressWarnings("magicnumber")
+ private LocalDateTime randomLocalDateTime() {
+ return LocalDateTime.of(
+ LocalDateTime.now().getYear(),
+ RandomUtils.nextInt(1, 12),
+ RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()),
+ RandomUtils.nextInt(0, 24),
+ RandomUtils.nextInt(0, 59)
+ );
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 65e6587f4..4e7b1317e 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -20,11 +20,10 @@ package
org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
@@ -38,6 +37,7 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
private SeaTunnelContext seaTunnelContext;
+ private SeatunnelSchema schema;
@Override
public Boundedness getBoundedness() {
@@ -46,14 +46,12 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public SeaTunnelRowType getProducedType() {
- return new SeaTunnelRowType(
- new String[]{"name", "age", "timestamp"},
- new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE,
BasicType.INT_TYPE, BasicType.LONG_TYPE});
+ return schema.getSeaTunnelRowType();
}
@Override
public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
- return new FakeSourceReader(readerContext);
+ return new FakeSourceReader(readerContext, new FakeRandomData(schema));
}
@Override
@@ -64,6 +62,8 @@ public class FakeSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
+ assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
+ this.schema =
SeatunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 7007eaf25..d67e1f4b2 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -26,20 +26,17 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceReader.class);
private final SingleSplitReaderContext context;
- private final String[] names = {"Wenjun", "Fanjia", "Zongwen",
"CalvinKirs"};
- private final int[] ages = {11, 22, 33, 44};
+ private final FakeRandomData fakeRandomData;
- public FakeSourceReader(SingleSplitReaderContext context) {
+ public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData
randomData) {
this.context = context;
+ this.fakeRandomData = randomData;
}
@Override
@@ -56,11 +53,8 @@ public class FakeSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
// Generate a random number of rows to emit.
- Random random = ThreadLocalRandom.current();
- int size = random.nextInt(10) + 1;
- for (int i = 0; i < size; i++) {
- int randomIndex = random.nextInt(names.length);
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new
Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()});
+ for (int i = 0; i < 10; i++) {
+ SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
output.collect(seaTunnelRow);
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
diff --git
a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
new file mode 100644
index 000000000..67906a161
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class FakeRandomDataTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
+ public void testComplexSchemaParse(String conf) throws
FileNotFoundException, URISyntaxException {
+ Config testConfigFile = getTestConfigFile(conf);
+ SeatunnelSchema seatunnelSchema =
SeatunnelSchema.buildWithConfig(testConfigFile);
+ FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema);
+ SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
+ Assertions.assertNotNull(seaTunnelRow);
+ Object[] fields = seaTunnelRow.getFields();
+ Assertions.assertNotNull(fields);
+ SeaTunnelRowType seaTunnelRowType =
seatunnelSchema.getSeaTunnelRowType();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i].getSqlType() != SqlType.NULL) {
+ Assertions.assertNotNull(fields[i]);
+ } else {
+ Assertions.assertSame(fields[i], Void.TYPE);
+ }
+ if (fieldTypes[i].getSqlType() == SqlType.MAP) {
+ Assertions.assertTrue(fields[i] instanceof Map);
+ Map<?, ?> field = (Map) fields[i];
+ field.forEach((k, v) -> Assertions.assertTrue(k != null && v
!= null));
+ }
+ if (fieldTypes[i].getSqlType() == SqlType.ARRAY) {
+ Assertions.assertTrue(fields[i].getClass().isArray());
+ Assertions.assertNotNull(Array.get(fields[i], 0));
+ }
+ }
+ }
+
+ private Config getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
+ if (!configFile.startsWith("/")) {
+ configFile = "/" + configFile;
+ }
+ URL resource = FakeRandomDataTest.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " +
configFile);
+ }
+ String path = Paths.get(resource.toURI()).toString();
+ Config config = ConfigFactory.parseFile(new File(path));
+ assert config.hasPath("schema");
+ return config.getConfig("schema");
+ }
+
+}
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
similarity index 53%
copy from
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
copy to
seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
index 2e27410ee..6a06dbf06 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf
@@ -6,7 +6,7 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,38 +15,24 @@
# limitations under the License.
#
-env {
- spark.app.name = "SeaTunnelToClickHouseV2"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- field_name = "name,age"
- }
-}
-
-
-transform {
- sql {
- sql = "select name,age from fake"
- result_table_name = "sql"
- }
-}
-
-sink {
- ClickHouse {
- host = "139.198.158.103:8123"
- database = "default"
- table = "test_clickhouse_table_v2"
- fields = ["name", "age"]
- username = 'default'
- bulk_size = 20000
- retry_codes = [209, 210]
- retry = 3
+schema {
+ fields {
+ map = "map<string, map<string, string>>"
+ map_array = "map<string, map<string, array<int>>>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
}
-}
+}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
similarity index 53%
copy from
seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
copy to
seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
index 2e27410ee..6716f00cd 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++
b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf
@@ -6,7 +6,7 @@
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,38 +15,23 @@
# limitations under the License.
#
-env {
- spark.app.name = "SeaTunnelToClickHouseV2"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- result_table_name = "fake"
- field_name = "name,age"
- }
-}
-
-
-transform {
- sql {
- sql = "select name,age from fake"
- result_table_name = "sql"
- }
-}
-
-sink {
- ClickHouse {
- host = "139.198.158.103:8123"
- database = "default"
- table = "test_clickhouse_table_v2"
- fields = ["name", "age"]
- username = 'default'
- bulk_size = 20000
- retry_codes = [209, 210]
- retry = 3
+schema {
+ fields {
+ map = "map<string, string>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
}
-}
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index d1e9c4583..b2fda476b 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -29,9 +29,13 @@ source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
- }
-
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
@@ -57,11 +61,11 @@ sink {
},
{
rule_type = MIN_LENGTH
- rule_value = 3
+ rule_value = 10
},
{
rule_type = MAX_LENGTH
- rule_value = 20
+ rule_value = 10
}
]
},{
@@ -73,16 +77,16 @@ sink {
},
{
rule_type = MIN
- rule_value = 1
+ rule_value = 32767
},
{
rule_type = MAX
- rule_value = 100
+ rule_value = 2147483647
}
]
}
]
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
index 49be0920f..d9eb0ff86 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -28,13 +28,17 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSource {
- result_table_name = "fake"
- field_name = "name,age"
+ FakeSource {
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
}
-
+ }
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -43,12 +47,12 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
index 769c8760d..269b85d08 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -61,5 +66,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
index 9f5fd0b17..5e1ea5c01 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -62,5 +67,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
index ef83dfd4e..d4a8a745c 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -62,5 +67,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
index b18b472f6..c118c33c8 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -61,5 +66,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
index 9e2d5ad96..c02c16145 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -62,5 +67,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
index d162b101f..7d1a5d42c 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
@@ -29,11 +29,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -42,7 +47,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
@@ -62,5 +67,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
index da1ae4936..4c5e0fe4e 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -29,13 +29,18 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name, age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
-
# If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
# please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
+
transform {
sql {
sql = "select * from (values('root.ln.d1', '1660147200000',
'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value',
'false,1002')) t (device, `timestamp`, measurements, `values`)"
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
index 9640e19c2..0f732b63b 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -30,11 +30,15 @@ source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name"
+ schema = {
+ fields {
+ name = "string"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -43,7 +47,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
@@ -57,5 +61,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
index 6862abc04..4d65c569e 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -43,11 +43,11 @@ source {
transform {
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
index 7b8127620..f469338a5 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -33,6 +33,12 @@ source {
# This is a example input plugin **only for test and demonstrate the feature
input plugin**
FakeSource {
result_table_name = "my_dataset"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# You can also use other input plugins, such as hdfs
@@ -42,8 +48,8 @@ source {
# format = "json"
# }
- # If you would like to get more information about how to configure seatunnel
and see full list of input plugins,
- # please go to
https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
index c4d1aabe5..40454bce0 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -60,5 +65,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
index bdae80d74..550990eea 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -61,5 +66,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
index b682d3831..2bf6afba6 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -61,5 +66,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
index d257f81bb..f8dab0cbc 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
index b5d141212..edbad7ec6 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/transform/sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
index 733a48e61..41ff5e8f6 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf
@@ -28,11 +28,16 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}
transform {
@@ -41,7 +46,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
index 9c7e521b7..503668487 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -28,7 +28,12 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name, age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure
seatunnel and see full list of source plugins,
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index feb394127..6a89b64a6 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -30,11 +30,16 @@ source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
@@ -43,12 +48,12 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
Console {}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
index aed134290..d681985d0 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf
@@ -30,11 +30,16 @@ source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
@@ -43,7 +48,7 @@ transform {
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -53,5 +58,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
index f7b790c40..d640281cc 100644
---
a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
+++
b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf
@@ -30,11 +30,16 @@ source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
@@ -43,7 +48,7 @@ transform {
sql = "select name,age from fake"
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -64,5 +69,5 @@ sink {
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
- # please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
index 2e27410ee..8d57e1f3b 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf
@@ -26,8 +26,15 @@ env {
source {
FakeSource {
result_table_name = "fake"
- field_name = "name,age"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
}
+ # If you would like to get more information about how to configure seatunnel
and see full list of input plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
diff --git
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 45ab040b4..6e7755d8c 100644
---
a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++
b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -33,8 +33,27 @@ env {
source {
# This is a example input plugin **only for test and demonstrate the feature
input plugin**
FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_time = time
+ c_timestamp = timestamp
+ }
+ }
result_table_name = "fake"
- field_name = "name,age,timestamp"
}
# You can also use other input plugins, such as hdfs
@@ -45,7 +64,7 @@ source {
# }
# If you would like to get more information about how to configure seatunnel
and see full list of input plugins,
- # please go to
https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
@@ -53,12 +72,12 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select name,age from fake"
+ sql = "select
c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes
from fake"
result_table_name = "sql"
}
# If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
- # please go to
https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
+ # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
@@ -72,5 +91,5 @@ sink {
# }
# If you would like to get more information about how to configure seatunnel
and see full list of output plugins,
- # please go to
https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
index 4b6495e7f..f642cfe45 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
@@ -57,12 +57,12 @@ public abstract class RowConverter<T> {
}
}
if (errors.size() > 0) {
- throw new UnsupportedOperationException("");
+ throw new UnsupportedOperationException(String.join(",", errors));
}
}
protected boolean validate(Object field, SeaTunnelDataType<?> dataType) {
- if (field == null) {
+ if (field == null || dataType.getSqlType() == SqlType.NULL) {
return true;
}
SqlType sqlType = dataType.getSqlType();