This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a9bc5ca58d [Feature][API] Support hocon style declare row type in
generic type (#6187)
a9bc5ca58d is described below
commit a9bc5ca58ddc0195806b323e2dd949b043db7d17
Author: Chengyu Yan <[email protected]>
AuthorDate: Wed Jan 17 22:55:31 2024 +0800
[Feature][API] Support hocon style declare row type in generic type (#6187)
---
docs/en/concept/schema-feature.md | 70 +++++++++++++------
release-note.md | 1 +
.../catalog/SeaTunnelDataTypeConvertorUtil.java | 79 +++++++++++++++-------
.../api/table/catalog/CatalogTableUtilTest.java | 37 ++++++++++
.../SeaTunnelDataTypeConvertorUtilTest.java | 22 ++++--
.../test/resources/conf/generic_row.schema.conf | 25 +++++++
.../seatunnel/e2e/connector/fake/FakeIT.java | 3 +
.../resources/fake_generic_row_type_to_assert.conf | 66 ++++++++++++++++++
8 files changed, 257 insertions(+), 46 deletions(-)
diff --git a/docs/en/concept/schema-feature.md
b/docs/en/concept/schema-feature.md
index 4c95e79fef..cee8c1a5ff 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -69,25 +69,57 @@ columns = [
#### What type supported at now
-| Data type | Value type in Java | Description
|
-|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| string | `java.lang.String` | string
|
-| boolean | `java.lang.Boolean` | boolean
|
-| tinyint | `java.lang.Byte` | -128 to 127
regular. 0 to 255 unsigned*. Specify the maximum number of digits in
parentheses.
|
-| smallint | `java.lang.Short` | -32768 to
32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in
parentheses.
|
-| int | `java.lang.Integer` | All numbers
from -2,147,483,648 to 2,147,483,647 are allowed.
|
-| bigint | `java.lang.Long` | All numbers
between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed.
|
-| float | `java.lang.Float` |
Float-precision numeric data from -1.79E+308 to 1.79E+308.
|
-| double | `java.lang.Double` | Double
precision floating point. Handle most decimals.
|
-| decimal | `java.math.BigDecimal` | DOUBLE type
stored as a string, allowing a fixed decimal point.
|
-| null | `java.lang.Void` | null
|
-| bytes | `byte[]` | bytes.
|
-| date | `java.time.LocalDate` | Only the
date is stored. From January 1, 0001 to December 31, 9999.
|
-| time | `java.time.LocalTime` | Only store
time. Accuracy is 100 nanoseconds.
|
-| timestamp | `java.time.LocalDateTime` | Stores a
unique number that is updated whenever a row is created or modified. timestamp
is based on the internal clock and does not correspond to real time. There can
only be one timestamp variable per table.
|
-| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row
type,can be nested.
|
-| map | `java.util.Map` | A Map is an
object that maps keys to values. The key type includes `int` `string` `boolean`
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time`
`timestamp` `null` , and the value type includes `int` `string` `boolean`
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time`
`timestamp` `null` `array` `map`. |
-| array | `ValueType[]` | A array is
a data type that represents a collection of elements. The element type includes
`int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `array`
`map`.
|
+| Data type | Value type in Java | Description
|
+|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| string | `java.lang.String` | string
|
+| boolean | `java.lang.Boolean` | boolean
|
+| tinyint | `java.lang.Byte` | -128 to 127
regular. 0 to 255 unsigned*. Specify the maximum number of digits in
parentheses.
|
+| smallint | `java.lang.Short` | -32768 to
32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in
parentheses.
|
+| int | `java.lang.Integer` | All numbers
from -2,147,483,648 to 2,147,483,647 are allowed.
|
+| bigint | `java.lang.Long` | All numbers
between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed.
|
+| float | `java.lang.Float` |
Float-precision numeric data from -1.79E+308 to 1.79E+308.
|
+| double | `java.lang.Double` | Double
precision floating point. Handle most decimals.
|
+| decimal | `java.math.BigDecimal` | DOUBLE type
stored as a string, allowing a fixed decimal point.
|
+| null | `java.lang.Void` | null
|
+| bytes | `byte[]` | bytes.
|
+| date | `java.time.LocalDate` | Only the
date is stored. From January 1, 0001 to December 31, 9999.
|
+| time | `java.time.LocalTime` | Only store
time. Accuracy is 100 nanoseconds.
|
+| timestamp | `java.time.LocalDateTime` | Stores a
unique number that is updated whenever a row is created or modified. timestamp
is based on the internal clock and does not correspond to real time. There can
only be one timestamp variable per table.
|
+| row | `org.apache.seatunnel.api.table.type.SeaTunnelRow` | Row
type,can be nested.
|
+| map | `java.util.Map` | A Map is an
object that maps keys to values. The key type includes `int` `string` `boolean`
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time`
`timestamp` `null` , and the value type includes `int` `string` `boolean`
`tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time`
`timestamp` `null` `array` `map` `row`. |
+| array | `ValueType[]` | A array is
a data type that represents a collection of elements. The element type includes
`int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `array`
`map` `row`.
|
+
+#### How to declare type supported
+
+SeaTunnel provides a simple and direct way to declare basic types. The keyword
names for basic types can be used directly as type declarations, and SeaTunnel
is case-insensitive to type keywords. Basic type keywords include `string`,
`boolean`, `tinyint`, `smallint`, `int`, `bigint`, `float`, `double`, `date`,
`time`, `timestamp`, and `null`. For example, if you need to declare a field
with integer type, you can simply define the field as `int` or `"int"`.
+
+When declaring complex types (such as **decimal**, **array**, **map**, and
**row**), pay attention to specific considerations.
+- When declaring a decimal type, precision and scale settings are required,
and the type definition follows the format `decimal(precision, scale)`. It's
essential to emphasize that the declaration of the decimal type must be
enclosed in `"`; you cannot use the type name directly, as with basic types.
For example, when declaring a decimal field with precision 10 and scale 2, you
specify the field type as `"decimal(10,2)"`.
+- When declaring an array type, you need to specify the element type, and the
type definition follows the format `array<T>`, where `T` represents the element
type and can be any type supported by SeaTunnel. Similar to the decimal type
declaration, it also be enclosed in `"`. For example, when declaring a field
with an array of integers, you specify the field type as `"array<int>"`.
+- When declaring a map type, you need to specify the key and value types. The
map type definition follows the format `map<K,V>`, where `K` represents the key
type and `V` represents the value type. `K` can be any basic type, and `V` can
be any type supported by SeaTunnel. Similar to previous type declarations, the
map type declaration must be enclosed in double quotes. For example, when
declaring a field with map type, where the key type is string and the value
type is integer, you can d [...]
+- When declaring a row type, you need to define a
[HOCON](https://github.com/lightbend/config/blob/main/HOCON.md) object to
describe the fields and their types. The field types can be any type supported
by SeaTunnel. For example, when declaring a row type containing an integer
field `a` and a string field `b`, you can declare it as `{a = int, b =
string}`. Enclosing the definition in `"` as a string is also acceptable, so
`"{a = int, b = string}"` is equivalent to `{a = int, c = string}` [...]
+
+Here is an example of complex type declarations:
+
+```hocon
+schema {
+ fields {
+ c_decimal = "decimal(10, 2)"
+ c_array = "array<{c_0 = int, c_1 = string}>"
+ c_row = {
+ c_int = int
+ c_string = string
+ c_row = {
+ c_int = int
+ }
+ }
+ # Hocon style declare row type in generic type
+ map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int =
int}}>"
+ # Json style declare row type in generic type
+ map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\",
\"c_row\":{\"c_int\":\"int\"}}>"
+ }
+}
+```
### PrimaryKey
diff --git a/release-note.md b/release-note.md
index 0ded29987d..831018d273 100644
--- a/release-note.md
+++ b/release-note.md
@@ -165,6 +165,7 @@
- [Core] [Shade] Add guava shade module (#4358)
- [Core] [Spark] Support SeaTunnel Time Type (#5188)
- [Core] [Flink] Support Decimal Type with configurable precision and scale
(#5419)
+- [Core] [API] Support hocon style declare row type in generic type (#6187)
### Connector-V2
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index f15980f41c..ce71a46051 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -17,7 +17,10 @@
package org.apache.seatunnel.api.table.catalog;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigObject;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -29,9 +32,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.common.utils.JsonUtils;
-
-import java.util.Map;
public class SeaTunnelDataTypeConvertorUtil {
@@ -87,13 +87,13 @@ public class SeaTunnelDataTypeConvertorUtil {
private static SeaTunnelDataType<?> parseComplexDataType(String field,
String columnStr) {
String column = columnStr.toUpperCase().replace(" ", "");
if (column.startsWith(SqlType.MAP.name())) {
- return parseMapType(field, column);
+ return parseMapType(field, columnStr);
}
if (column.startsWith(SqlType.ARRAY.name())) {
- return parseArrayType(field, column);
+ return parseArrayType(field, columnStr);
}
if (column.startsWith(SqlType.DECIMAL.name())) {
- return parseDecimalType(column);
+ return parseDecimalType(columnStr);
}
if (column.trim().startsWith("{")) {
return parseRowType(columnStr);
@@ -102,31 +102,64 @@ public class SeaTunnelDataTypeConvertorUtil {
}
private static SeaTunnelDataType<?> parseRowType(String columnStr) {
- ObjectNode jsonNodes = JsonUtils.parseObject(columnStr);
- Map<String, String> fieldsMap = JsonUtils.toStringMap(jsonNodes);
- String[] fieldsName = new String[fieldsMap.size()];
- SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType<?>[fieldsMap.size()];
- int i = 0;
- for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
- fieldsName[i] = entry.getKey();
- seaTunnelDataTypes[i] =
deserializeSeaTunnelDataType(entry.getKey(), entry.getValue());
- i++;
+ String confPayload = "{conf = " + columnStr + "}";
+ Config conf;
+ try {
+ conf = ConfigFactory.parseString(confPayload);
+ } catch (RuntimeException e) {
+ throw new IllegalArgumentException(
+ String.format("HOCON Config parse from %s failed.",
confPayload), e);
+ }
+ return parseRowType(conf.getObject("conf"));
+ }
+
+ private static SeaTunnelDataType<?> parseRowType(ConfigObject conf) {
+ String[] fieldNames = new String[conf.size()];
+ SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[conf.size()];
+ conf.keySet().toArray(fieldNames);
+
+ for (int idx = 0; idx < fieldNames.length; idx++) {
+ String fieldName = fieldNames[idx];
+ ConfigValue typeVal = conf.get(fieldName);
+ switch (typeVal.valueType()) {
+ case STRING:
+ {
+ fieldTypes[idx] =
+ deserializeSeaTunnelDataType(
+ fieldNames[idx], (String)
typeVal.unwrapped());
+ }
+ break;
+ case OBJECT:
+ {
+ fieldTypes[idx] = parseRowType((ConfigObject) typeVal);
+ }
+ break;
+ case LIST:
+ case NUMBER:
+ case BOOLEAN:
+ case NULL:
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported parse SeaTunnel Type from
'%s'.",
+ typeVal.unwrapped()));
+ }
}
- return new SeaTunnelRowType(fieldsName, seaTunnelDataTypes);
+ return new SeaTunnelRowType(fieldNames, fieldTypes);
}
private static SeaTunnelDataType<?> parseMapType(String field, String
columnStr) {
- String genericType = getGenericType(columnStr);
+ String genericType = getGenericType(columnStr).trim();
int index =
- genericType.startsWith(SqlType.DECIMAL.name())
+ genericType.toUpperCase().startsWith(SqlType.DECIMAL.name())
?
// if map key is decimal, we should find the index of
second ','
genericType.indexOf(",", genericType.indexOf(",") + 1)
:
// if map key is not decimal, we should find the index
of first ','
genericType.indexOf(",");
- String keyGenericType = genericType.substring(0, index);
- String valueGenericType = genericType.substring(index + 1);
+ String keyGenericType = genericType.substring(0, index).trim();
+ String valueGenericType = genericType.substring(index + 1).trim();
return new MapType<>(
deserializeSeaTunnelDataType(field, keyGenericType),
deserializeSeaTunnelDataType(field, valueGenericType));
@@ -138,7 +171,7 @@ public class SeaTunnelDataTypeConvertorUtil {
}
private static SeaTunnelDataType<?> parseArrayType(String field, String
columnStr) {
- String genericType = getGenericType(columnStr);
+ String genericType = getGenericType(columnStr).trim();
SeaTunnelDataType<?> dataType = deserializeSeaTunnelDataType(field,
genericType);
switch (dataType.getSqlType()) {
case STRING:
@@ -158,7 +191,7 @@ public class SeaTunnelDataTypeConvertorUtil {
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
- throw CommonError.unsupportedDataType("SeaTunnel", columnStr,
field);
+ throw CommonError.unsupportedDataType("SeaTunnel",
genericType, field);
}
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
index d3e84205c4..9b579fbd90 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
@@ -137,6 +138,42 @@ public class CatalogTableUtilTest {
Thread.currentThread().getContextClassLoader()));
}
+ @Test
+ public void testGenericRowSchemaTest() throws FileNotFoundException,
URISyntaxException {
+ String path = getTestConfigFile("/conf/generic_row.schema.conf");
+ Config config = ConfigFactory.parseFile(new File(path));
+ SeaTunnelRowType seaTunnelRowType =
+ CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ Assertions.assertNotNull(seaTunnelRowType);
+ Assertions.assertArrayEquals(
+ new String[] {"map0", "map1"},
seaTunnelRowType.getFieldNames());
+
+ MapType<String, SeaTunnelRowType> mapType0 =
+ (MapType<String, SeaTunnelRowType>)
seaTunnelRowType.getFieldType(0);
+ MapType<String, SeaTunnelRowType> mapType1 =
+ (MapType<String, SeaTunnelRowType>)
seaTunnelRowType.getFieldType(1);
+ Assertions.assertNotNull(mapType0);
+ Assertions.assertNotNull(mapType1);
+ Assertions.assertEquals(BasicType.STRING_TYPE, mapType0.getKeyType());
+
+ SeaTunnelRowType expectedVal =
+ new SeaTunnelRowType(
+ new String[] {"c_int", "c_string", "c_row"},
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ new SeaTunnelRowType(
+ new String[] {"c_int"},
+ new SeaTunnelDataType[]
{BasicType.INT_TYPE})
+ });
+ SeaTunnelRowType mapType0ValType =
+ (SeaTunnelRowType) ((SeaTunnelDataType<?>)
mapType0.getValueType());
+ Assertions.assertEquals(expectedVal, mapType0ValType);
+ SeaTunnelRowType mapType1ValType =
+ (SeaTunnelRowType) ((SeaTunnelDataType<?>)
mapType1.getValueType());
+ Assertions.assertEquals(expectedVal, mapType1ValType);
+ }
+
public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = CatalogTableUtilTest.class.getResource(configFile);
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
index ac1717a3ab..f126722d83 100644
---
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -66,13 +66,27 @@ public class SeaTunnelDataTypeConvertorUtilTest {
"ErrorCode:[COMMON-07], ErrorDescription:['SeaTunnel'
unsupported data type 'uuid' of 'test']",
exception4.getMessage());
- RuntimeException exception5 =
+ IllegalArgumentException exception5 =
Assertions.assertThrows(
- RuntimeException.class,
+ IllegalArgumentException.class,
() ->
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
"test", "{uuid}"));
- Assertions.assertEquals(
- "String json deserialization exception.{uuid}",
exception5.getMessage());
+ String expectedMsg5 =
+ String.format("HOCON Config parse from %s failed.", "{conf =
{uuid}}");
+ Assertions.assertEquals(expectedMsg5, exception5.getMessage());
+
+ String invalidTypeDeclaration = "[e]";
+ IllegalArgumentException exception6 =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "test",
+ String.format("{c_0 = %s}",
invalidTypeDeclaration)));
+ String expectedMsg6 =
+ String.format(
+ "Unsupported parse SeaTunnel Type from '%s'.",
invalidTypeDeclaration);
+ Assertions.assertEquals(expectedMsg6, exception6.getMessage());
}
}
diff --git a/seatunnel-api/src/test/resources/conf/generic_row.schema.conf
b/seatunnel-api/src/test/resources/conf/generic_row.schema.conf
new file mode 100644
index 0000000000..168db5dff9
--- /dev/null
+++ b/seatunnel-api/src/test/resources/conf/generic_row.schema.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+schema {
+ fields {
+ # Hocon style declare row type in generic type
+ map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int =
int}}>"
+ # Json style declare row type in generic type
+ map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\",
\"c_row\":{\"c_int\":\"int\"}}>"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
index baafb6e33c..d3d187b172 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FakeIT.java
@@ -38,5 +38,8 @@ public class FakeIT extends TestSuiteBase {
Container.ExecResult fakeWithTemplate =
container.executeJob("/fake_to_assert_with_template.conf");
Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
+ Container.ExecResult fakeComplex =
+ container.executeJob("/fake_generic_row_type_to_assert.conf");
+ Assertions.assertEquals(0, fakeWithTemplate.getExitCode());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
new file mode 100644
index 0000000000..8bdf07ef47
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_generic_row_type_to_assert.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = BATCH
+ # checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ row.num = 1
+ schema = {
+ fields {
+ c_0 = "map<string, {c_int=int\nc_string=string}>"
+ c_1 = "map<string, {c_int=int,c_string=string}>"
+ c_2 = "map<string, {c_int=int,c_string=string,c_row={c_int=int}}>"
+ c_3 = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink{
+ Assert {
+ source_table_name = "fake"
+ rules =
+ {
+ catalog_table_rule {
+ column_rule = [
+ {
+ name = "c_0"
+ type = "map<string, {c_int=int\nc_string=string}>"
+ }
+ {
+ name = "c_1"
+ type = "map<string, {c_int=int,c_string=string}>"
+ }
+ {
+ name = "c_2"
+ type = "map<string,
{c_int=int,c_string=string,c_row={c_int=int}}>"
+ }
+ {
+ name = "c_3"
+ type = "map<string, {\"c_int\":\"int\",\"c_string\":\"string\"}>"
+ }
+ ]
+ }
+ }
+ }
+}
\ No newline at end of file