This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b78952f06 [Fix] Fix the error msg when parse schema with unsupported
type (#5790)
7b78952f06 is described below
commit 7b78952f0698ee7386c01cc827396fc4f692bf11
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 10:48:56 2023 +0800
[Fix] Fix the error msg when parse schema with unsupported type (#5790)
---
.../catalog/SeaTunnelDataTypeConvertorUtil.java | 6 +-
.../table/catalog/schema/ReadonlyConfigParser.java | 213 ++++-----------------
.../SeaTunnelDataTypeConvertorUtilTest.java | 43 +++++
3 files changed, 88 insertions(+), 174 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index 272878c749..e819077725 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -94,7 +94,11 @@ public class SeaTunnelDataTypeConvertorUtil {
if (column.startsWith(SqlType.DECIMAL.name())) {
return parseDecimalType(column);
}
- return parseRowType(columnStr);
+ if (column.trim().startsWith("{")) {
+ return parseRowType(columnStr);
+ }
+ throw new UnsupportedOperationException(
+ String.format("the type[%s] is not support", columnStr));
}
private static SeaTunnelDataType<?> parseRowType(String columnStr) {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index 8ed0a7a058..66621fbfa3 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -81,7 +81,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
return tableSchemaBuilder.build();
}
- public class FieldParser implements
TableSchemaParser.FieldParser<ReadonlyConfig> {
+ private static class FieldParser implements
TableSchemaParser.FieldParser<ReadonlyConfig> {
@Override
public List<Column> parse(ReadonlyConfig schemaConfig) {
@@ -102,7 +102,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
}
}
- public class ColumnParser implements
TableSchemaParser.ColumnParser<ReadonlyConfig> {
+ private static class ColumnParser implements
TableSchemaParser.ColumnParser<ReadonlyConfig> {
@Override
public List<Column> parse(ReadonlyConfig schemaConfig) {
@@ -150,7 +150,7 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
}
}
- public class ConstraintKeyParser
+ private static class ConstraintKeyParser
implements TableSchemaParser.ConstraintKeyParser<ReadonlyConfig> {
@Override
@@ -184,37 +184,41 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
TableSchemaOptions.ConstraintKeyOptions
.CONSTRAINT_KEY_COLUMNS)
.map(
-
constraintColumnMapList -> {
- return
constraintColumnMapList.stream()
-
.map(ReadonlyConfig::fromMap)
- .map(
-
constraintColumnConfig -> {
-
String columnName =
-
constraintColumnConfig
-
.getOptional(
-
TableSchemaOptions
-
.ConstraintKeyOptions
-
.CONSTRAINT_KEY_COLUMN_NAME)
-
.orElseThrow(
-
() ->
-
new IllegalArgumentException(
-
"schema.constraintKeys.constraintColumns.* config need option [columnName],
please correct your config first"));
-
ConstraintKey
-
.ColumnSortType
-
columnSortType =
-
constraintColumnConfig
-
.get(
-
TableSchemaOptions
-
.ConstraintKeyOptions
-
.CONSTRAINT_KEY_COLUMN_SORT_TYPE);
-
return ConstraintKey
-
.ConstraintKeyColumn
-
.of(
-
columnName,
-
columnSortType);
- })
-
.collect(Collectors.toList());
- })
+
constraintColumnMapList ->
+
constraintColumnMapList.stream()
+ .map(
+
ReadonlyConfig
+
::fromMap)
+ .map(
+
constraintColumnConfig -> {
+
String
+
columnName =
+
constraintColumnConfig
+
.getOptional(
+
TableSchemaOptions
+
.ConstraintKeyOptions
+
.CONSTRAINT_KEY_COLUMN_NAME)
+
.orElseThrow(
+
() ->
+
new IllegalArgumentException(
+
"schema.constraintKeys.constraintColumns.* config need option [columnName],
please correct your config first"));
+
ConstraintKey
+
.ColumnSortType
+
columnSortType =
+
constraintColumnConfig
+
.get(
+
TableSchemaOptions
+
.ConstraintKeyOptions
+
.CONSTRAINT_KEY_COLUMN_SORT_TYPE);
+
return ConstraintKey
+
.ConstraintKeyColumn
+
.of(
+
columnName,
+
columnSortType);
+
})
+
.collect(
+
Collectors
+
.toList()))
.orElseThrow(
() ->
new
IllegalArgumentException(
@@ -225,7 +229,8 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
}
}
- public class PrimaryKeyParser implements
TableSchemaParser.PrimaryKeyParser<ReadonlyConfig> {
+ private static class PrimaryKeyParser
+ implements TableSchemaParser.PrimaryKeyParser<ReadonlyConfig> {
@Override
public PrimaryKey parse(ReadonlyConfig schemaConfig) {
@@ -249,142 +254,4 @@ public class ReadonlyConfigParser implements
TableSchemaParser<ReadonlyConfig> {
return new PrimaryKey(primaryKeyName, columns);
}
}
-
- /**
- * Parse columns from columns config.
- *
- * <pre>
- * columns = [
- * {
- * name = "name"
- * type = "string"
- * columnLength = 0
- * nullable = true
- * defaultValue = null
- * comment = "name"
- * },
- * {
- * name = "age"
- * type = "int"
- * columnLength = 0
- * nullable = true
- * defaultValue = null
- * comment = "age"
- * }
- * ]
- * </pre>
- *
- * @param columnConfig columns config
- * @return columns
- */
- private Column parseFromColumn(ReadonlyConfig columnConfig) {
- String name =
- columnConfig
- .getOptional(TableSchemaOptions.ColumnOptions.NAME)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "schema.columns.* config need
option [name], please correct your config first"));
- SeaTunnelDataType<?> seaTunnelDataType =
- columnConfig
- .getOptional(TableSchemaOptions.ColumnOptions.TYPE)
-
.map(SeaTunnelDataTypeConvertorUtil::deserializeSeaTunnelDataType)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "schema.columns.* config need
option [type], please correct your config first"));
-
- Integer columnLength =
columnConfig.get(TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);
- Boolean nullable =
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
- Object defaultValue =
columnConfig.get(TableSchemaOptions.ColumnOptions.DEFAULT_VALUE);
- String comment =
columnConfig.get(TableSchemaOptions.ColumnOptions.COMMENT);
- return PhysicalColumn.of(
- name, seaTunnelDataType, columnLength, nullable, defaultValue,
comment);
- }
-
- /**
- * Parse primary key from primary key config.
- *
- * <pre>
- * primaryKey {
- * name = "primary_key"
- * columnNames = ["name", "age"]
- * }
- * </pre>
- *
- * @param primaryKeyConfig primary key config
- * @return primary key
- */
- private PrimaryKey parsePrimaryKey(Map<String, Object> primaryKeyConfig) {
- if (!primaryKeyConfig.containsKey(
-
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_NAME.key())
- || !primaryKeyConfig.containsKey(
-
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_COLUMNS.key())) {
- throw new IllegalArgumentException(
- "Schema config need option [primaryKey.name,
primaryKey.columnNames], please correct your config first");
- }
-
- String primaryKeyName =
- (String)
- primaryKeyConfig.get(
-
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_NAME.key());
- List<String> columns =
- (List<String>)
- primaryKeyConfig.get(
-
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_COLUMNS.key());
- return new PrimaryKey(primaryKeyName, columns);
- }
-
- private ConstraintKey parseConstraintKeys(ReadonlyConfig
constraintKeyConfig) {
- String constraintName =
- constraintKeyConfig
-
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_NAME)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "schema.constraintKeys.*
config need option [constraintName], please correct your config first"));
- ConstraintKey.ConstraintType constraintType =
- constraintKeyConfig
-
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_TYPE)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "schema.constraintKeys.*
config need option [constraintType], please correct your config first"));
- List<ConstraintKey.ConstraintKeyColumn> columns =
- constraintKeyConfig
-
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_COLUMNS)
- .map(
- constraintColumnMapList -> {
- return constraintColumnMapList.stream()
- .map(ReadonlyConfig::fromMap)
- .map(
- constraintColumnConfig -> {
- String columnName =
-
constraintColumnConfig
-
.getOptional(
-
TableSchemaOptions
-
.ConstraintKeyOptions
-
.CONSTRAINT_KEY_COLUMN_NAME)
-
.orElseThrow(
-
() ->
-
new IllegalArgumentException(
-
"schema.constraintKeys.constraintColumns.* config need option
[columnName], please correct your config first"));
-
ConstraintKey.ColumnSortType
- columnSortType
=
-
constraintColumnConfig.get(
-
TableSchemaOptions
-
.ConstraintKeyOptions
-
.CONSTRAINT_KEY_COLUMN_SORT_TYPE);
- return
ConstraintKey.ConstraintKeyColumn.of(
- columnName,
columnSortType);
- })
- .collect(Collectors.toList());
- })
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "schema.constraintKeys.*
config need option [columns], please correct your config first"));
-
- return ConstraintKey.of(constraintType, constraintName, columns);
- }
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
new file mode 100644
index 0000000000..cbcc0f1bd9
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SeaTunnelDataTypeConvertorUtilTest {
+
+ @Test
+ void testParseWithUnsupportedType() {
+
+ UnsupportedOperationException exception =
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("uuid"));
+ Assertions.assertEquals("the type[uuid] is not support",
exception.getMessage());
+
+ RuntimeException exception2 =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+ "{uuid}"));
+ Assertions.assertEquals(
+ "String json deserialization exception.{uuid}",
exception2.getMessage());
+ }
+}