This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b78952f06 [Fix] Fix the error msg when parse schema with unsupported 
type (#5790)
7b78952f06 is described below

commit 7b78952f0698ee7386c01cc827396fc4f692bf11
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 10:48:56 2023 +0800

    [Fix] Fix the error msg when parse schema with unsupported type (#5790)
---
 .../catalog/SeaTunnelDataTypeConvertorUtil.java    |   6 +-
 .../table/catalog/schema/ReadonlyConfigParser.java | 213 ++++-----------------
 .../SeaTunnelDataTypeConvertorUtilTest.java        |  43 +++++
 3 files changed, 88 insertions(+), 174 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index 272878c749..e819077725 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -94,7 +94,11 @@ public class SeaTunnelDataTypeConvertorUtil {
         if (column.startsWith(SqlType.DECIMAL.name())) {
             return parseDecimalType(column);
         }
-        return parseRowType(columnStr);
+        if (column.trim().startsWith("{")) {
+            return parseRowType(columnStr);
+        }
+        throw new UnsupportedOperationException(
+                String.format("the type[%s] is not support", columnStr));
     }
 
     private static SeaTunnelDataType<?> parseRowType(String columnStr) {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
index 8ed0a7a058..66621fbfa3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/ReadonlyConfigParser.java
@@ -81,7 +81,7 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
         return tableSchemaBuilder.build();
     }
 
-    public class FieldParser implements 
TableSchemaParser.FieldParser<ReadonlyConfig> {
+    private static class FieldParser implements 
TableSchemaParser.FieldParser<ReadonlyConfig> {
 
         @Override
         public List<Column> parse(ReadonlyConfig schemaConfig) {
@@ -102,7 +102,7 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
         }
     }
 
-    public class ColumnParser implements 
TableSchemaParser.ColumnParser<ReadonlyConfig> {
+    private static class ColumnParser implements 
TableSchemaParser.ColumnParser<ReadonlyConfig> {
 
         @Override
         public List<Column> parse(ReadonlyConfig schemaConfig) {
@@ -150,7 +150,7 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
         }
     }
 
-    public class ConstraintKeyParser
+    private static class ConstraintKeyParser
             implements TableSchemaParser.ConstraintKeyParser<ReadonlyConfig> {
 
         @Override
@@ -184,37 +184,41 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
                                                         
TableSchemaOptions.ConstraintKeyOptions
                                                                 
.CONSTRAINT_KEY_COLUMNS)
                                                 .map(
-                                                        
constraintColumnMapList -> {
-                                                            return 
constraintColumnMapList.stream()
-                                                                    
.map(ReadonlyConfig::fromMap)
-                                                                    .map(
-                                                                            
constraintColumnConfig -> {
-                                                                               
 String columnName =
-                                                                               
         constraintColumnConfig
-                                                                               
                 .getOptional(
-                                                                               
                         TableSchemaOptions
-                                                                               
                                 .ConstraintKeyOptions
-                                                                               
                                 .CONSTRAINT_KEY_COLUMN_NAME)
-                                                                               
                 .orElseThrow(
-                                                                               
                         () ->
-                                                                               
                                 new IllegalArgumentException(
-                                                                               
                                         
"schema.constraintKeys.constraintColumns.* config need option [columnName], 
please correct your config first"));
-                                                                               
 ConstraintKey
-                                                                               
                 .ColumnSortType
-                                                                               
         columnSortType =
-                                                                               
                 constraintColumnConfig
-                                                                               
                         .get(
-                                                                               
                                 TableSchemaOptions
-                                                                               
                                         .ConstraintKeyOptions
-                                                                               
                                         .CONSTRAINT_KEY_COLUMN_SORT_TYPE);
-                                                                               
 return ConstraintKey
-                                                                               
         .ConstraintKeyColumn
-                                                                               
         .of(
-                                                                               
                 columnName,
-                                                                               
                 columnSortType);
-                                                                            })
-                                                                    
.collect(Collectors.toList());
-                                                        })
+                                                        
constraintColumnMapList ->
+                                                                
constraintColumnMapList.stream()
+                                                                        .map(
+                                                                               
 ReadonlyConfig
+                                                                               
         ::fromMap)
+                                                                        .map(
+                                                                               
 constraintColumnConfig -> {
+                                                                               
     String
+                                                                               
             columnName =
+                                                                               
                     constraintColumnConfig
+                                                                               
                             .getOptional(
+                                                                               
                                     TableSchemaOptions
+                                                                               
                                             .ConstraintKeyOptions
+                                                                               
                                             .CONSTRAINT_KEY_COLUMN_NAME)
+                                                                               
                             .orElseThrow(
+                                                                               
                                     () ->
+                                                                               
                                             new IllegalArgumentException(
+                                                                               
                                                     
"schema.constraintKeys.constraintColumns.* config need option [columnName], 
please correct your config first"));
+                                                                               
     ConstraintKey
+                                                                               
                     .ColumnSortType
+                                                                               
             columnSortType =
+                                                                               
                     constraintColumnConfig
+                                                                               
                             .get(
+                                                                               
                                     TableSchemaOptions
+                                                                               
                                             .ConstraintKeyOptions
+                                                                               
                                             .CONSTRAINT_KEY_COLUMN_SORT_TYPE);
+                                                                               
     return ConstraintKey
+                                                                               
             .ConstraintKeyColumn
+                                                                               
             .of(
+                                                                               
                     columnName,
+                                                                               
                     columnSortType);
+                                                                               
 })
+                                                                        
.collect(
+                                                                               
 Collectors
+                                                                               
         .toList()))
                                                 .orElseThrow(
                                                         () ->
                                                                 new 
IllegalArgumentException(
@@ -225,7 +229,8 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
         }
     }
 
-    public class PrimaryKeyParser implements 
TableSchemaParser.PrimaryKeyParser<ReadonlyConfig> {
+    private static class PrimaryKeyParser
+            implements TableSchemaParser.PrimaryKeyParser<ReadonlyConfig> {
 
         @Override
         public PrimaryKey parse(ReadonlyConfig schemaConfig) {
@@ -249,142 +254,4 @@ public class ReadonlyConfigParser implements 
TableSchemaParser<ReadonlyConfig> {
             return new PrimaryKey(primaryKeyName, columns);
         }
     }
-
-    /**
-     * Parse columns from columns config.
-     *
-     * <pre>
-     *     columns = [
-     *      {
-     *          name = "name"
-     *          type = "string"
-     *          columnLength = 0
-     *          nullable = true
-     *          defaultValue = null
-     *          comment = "name"
-     *     },
-     *     {
-     *          name = "age"
-     *          type = "int"
-     *          columnLength = 0
-     *          nullable = true
-     *          defaultValue = null
-     *          comment = "age"
-     *     }
-     *     ]
-     * </pre>
-     *
-     * @param columnConfig columns config
-     * @return columns
-     */
-    private Column parseFromColumn(ReadonlyConfig columnConfig) {
-        String name =
-                columnConfig
-                        .getOptional(TableSchemaOptions.ColumnOptions.NAME)
-                        .orElseThrow(
-                                () ->
-                                        new IllegalArgumentException(
-                                                "schema.columns.* config need 
option [name], please correct your config first"));
-        SeaTunnelDataType<?> seaTunnelDataType =
-                columnConfig
-                        .getOptional(TableSchemaOptions.ColumnOptions.TYPE)
-                        
.map(SeaTunnelDataTypeConvertorUtil::deserializeSeaTunnelDataType)
-                        .orElseThrow(
-                                () ->
-                                        new IllegalArgumentException(
-                                                "schema.columns.* config need 
option [type], please correct your config first"));
-
-        Integer columnLength = 
columnConfig.get(TableSchemaOptions.ColumnOptions.COLUMN_LENGTH);
-        Boolean nullable = 
columnConfig.get(TableSchemaOptions.ColumnOptions.NULLABLE);
-        Object defaultValue = 
columnConfig.get(TableSchemaOptions.ColumnOptions.DEFAULT_VALUE);
-        String comment = 
columnConfig.get(TableSchemaOptions.ColumnOptions.COMMENT);
-        return PhysicalColumn.of(
-                name, seaTunnelDataType, columnLength, nullable, defaultValue, 
comment);
-    }
-
-    /**
-     * Parse primary key from primary key config.
-     *
-     * <pre>
-     *     primaryKey {
-     *          name = "primary_key"
-     *          columnNames = ["name", "age"]
-     *     }
-     * </pre>
-     *
-     * @param primaryKeyConfig primary key config
-     * @return primary key
-     */
-    private PrimaryKey parsePrimaryKey(Map<String, Object> primaryKeyConfig) {
-        if (!primaryKeyConfig.containsKey(
-                        
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_NAME.key())
-                || !primaryKeyConfig.containsKey(
-                        
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_COLUMNS.key())) {
-            throw new IllegalArgumentException(
-                    "Schema config need option [primaryKey.name, 
primaryKey.columnNames], please correct your config first");
-        }
-
-        String primaryKeyName =
-                (String)
-                        primaryKeyConfig.get(
-                                
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_NAME.key());
-        List<String> columns =
-                (List<String>)
-                        primaryKeyConfig.get(
-                                
TableSchemaOptions.PrimaryKeyOptions.PRIMARY_KEY_COLUMNS.key());
-        return new PrimaryKey(primaryKeyName, columns);
-    }
-
-    private ConstraintKey parseConstraintKeys(ReadonlyConfig 
constraintKeyConfig) {
-        String constraintName =
-                constraintKeyConfig
-                        
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_NAME)
-                        .orElseThrow(
-                                () ->
-                                        new IllegalArgumentException(
-                                                "schema.constraintKeys.* 
config need option [constraintName], please correct your config first"));
-        ConstraintKey.ConstraintType constraintType =
-                constraintKeyConfig
-                        
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_TYPE)
-                        .orElseThrow(
-                                () ->
-                                        new IllegalArgumentException(
-                                                "schema.constraintKeys.* 
config need option [constraintType], please correct your config first"));
-        List<ConstraintKey.ConstraintKeyColumn> columns =
-                constraintKeyConfig
-                        
.getOptional(TableSchemaOptions.ConstraintKeyOptions.CONSTRAINT_KEY_COLUMNS)
-                        .map(
-                                constraintColumnMapList -> {
-                                    return constraintColumnMapList.stream()
-                                            .map(ReadonlyConfig::fromMap)
-                                            .map(
-                                                    constraintColumnConfig -> {
-                                                        String columnName =
-                                                                
constraintColumnConfig
-                                                                        
.getOptional(
-                                                                               
 TableSchemaOptions
-                                                                               
         .ConstraintKeyOptions
-                                                                               
         .CONSTRAINT_KEY_COLUMN_NAME)
-                                                                        
.orElseThrow(
-                                                                               
 () ->
-                                                                               
         new IllegalArgumentException(
-                                                                               
                 "schema.constraintKeys.constraintColumns.* config need option 
[columnName], please correct your config first"));
-                                                        
ConstraintKey.ColumnSortType
-                                                                columnSortType 
=
-                                                                        
constraintColumnConfig.get(
-                                                                               
 TableSchemaOptions
-                                                                               
         .ConstraintKeyOptions
-                                                                               
         .CONSTRAINT_KEY_COLUMN_SORT_TYPE);
-                                                        return 
ConstraintKey.ConstraintKeyColumn.of(
-                                                                columnName, 
columnSortType);
-                                                    })
-                                            .collect(Collectors.toList());
-                                })
-                        .orElseThrow(
-                                () ->
-                                        new IllegalArgumentException(
-                                                "schema.constraintKeys.* 
config need option [columns], please correct your config first"));
-
-        return ConstraintKey.of(constraintType, constraintName, columns);
-    }
 }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
new file mode 100644
index 0000000000..cbcc0f1bd9
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtilTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SeaTunnelDataTypeConvertorUtilTest {
+
+    @Test
+    void testParseWithUnsupportedType() {
+
+        UnsupportedOperationException exception =
+                Assertions.assertThrows(
+                        UnsupportedOperationException.class,
+                        () -> 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType("uuid"));
+        Assertions.assertEquals("the type[uuid] is not support", 
exception.getMessage());
+
+        RuntimeException exception2 =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () ->
+                                
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                                        "{uuid}"));
+        Assertions.assertEquals(
+                "String json deserialization exception.{uuid}", 
exception2.getMessage());
+    }
+}

Reply via email to