kalencaya opened a new issue #1428:
URL: https://github.com/apache/incubator-seatunnel/issues/1428


   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   Seatunnel implements data integration through by source -> transforms -> 
sink principles, and any flink plugin supports `result_table_name` or 
`source_table_name` option for data transformation between DataStream and 
DataSet by Table api.
   
   Below is `TableUtil` source code, which connect source, transfrom, sink.
   ```
   public final class TableUtil {
   
       private TableUtil() {
       }
   
       public static DataStream<Row> tableToDataStream(StreamTableEnvironment 
tableEnvironment, Table table, boolean isAppend) {
   
           TypeInformation<Row> typeInfo = table.getSchema().toRowType();
           if (isAppend) {
               return tableEnvironment.toAppendStream(table, typeInfo);
           }
           return tableEnvironment
                   .toRetractStream(table, typeInfo)
                   .filter(row -> row.f0)
                   .map(row -> row.f1)
                   .returns(typeInfo);
       }
   
       public static DataSet<Row> tableToDataSet(BatchTableEnvironment 
tableEnvironment, Table table) {
           return tableEnvironment.toDataSet(table, 
table.getSchema().toRowType());
       }
   
       public static void dataStreamToTable(StreamTableEnvironment 
tableEnvironment, String tableName, DataStream<Row> dataStream) {
           tableEnvironment.registerDataStream(tableName, dataStream);
       }
   
       public static void dataSetToTable(BatchTableEnvironment 
tableEnvironment, String tableName, DataSet<Row> dataSet) {
           tableEnvironment.registerDataSet(tableName, dataSet);
       }
   
       public static boolean tableExists(TableEnvironment tableEnvironment, 
String name) {
           String currentCatalog = tableEnvironment.getCurrentCatalog();
           Catalog catalog = tableEnvironment.getCatalog(currentCatalog).get();
           ObjectPath objectPath = new 
ObjectPath(tableEnvironment.getCurrentDatabase(), name);
           return catalog.tableExists(objectPath);
       }
   }
   ```
   The problem is that when DataStream or DataSet contains some TypeInfomation 
which Table api not supports, then exception appears.
   
   On flink jdbc plugin case, Flink only supports a few type but different 
RDBMS has own sql dialect and `JdbcSource#informationMapping` has to provide 
mapping from jdbc type to TypeInformation. If we add some TypeInformation not 
supported by `JdbcTypeUtil#TYPE_MAPPING` and `JdbcTypeUtil#SQL_TYPE_NAMES`, 
then transformation between DataSet or DataStream between Table may failed by 
exception.
   ```
   public class JdbcTypeUtil {
       private static final Map<TypeInformation<?>, Integer> TYPE_MAPPING;
       private static final Map<Integer, String> SQL_TYPE_NAMES;
   
       public static int typeInformationToSqlType(TypeInformation<?> type) {
           if (TYPE_MAPPING.containsKey(type)) {
               return (Integer)TYPE_MAPPING.get(type);
           } else if (!(type instanceof ObjectArrayTypeInfo) && !(type 
instanceof PrimitiveArrayTypeInfo)) {
               throw new IllegalArgumentException("Unsupported type: " + type);
           } else {
               return 2003;
           }
       }
   
       static {
           HashMap<TypeInformation<?>, Integer> m = new HashMap();
           m.put(BasicTypeInfo.STRING_TYPE_INFO, 12);
           m.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, 16);
           m.put(BasicTypeInfo.BYTE_TYPE_INFO, -6);
           m.put(BasicTypeInfo.SHORT_TYPE_INFO, 5);
           m.put(BasicTypeInfo.INT_TYPE_INFO, 4);
           m.put(BasicTypeInfo.LONG_TYPE_INFO, -5);
           m.put(BasicTypeInfo.FLOAT_TYPE_INFO, 7);
           m.put(BasicTypeInfo.DOUBLE_TYPE_INFO, 8);
           m.put(SqlTimeTypeInfo.DATE, 91);
           m.put(SqlTimeTypeInfo.TIME, 92);
           m.put(SqlTimeTypeInfo.TIMESTAMP, 93);
           m.put(LocalTimeTypeInfo.LOCAL_DATE, 91);
           m.put(LocalTimeTypeInfo.LOCAL_TIME, 92);
           m.put(LocalTimeTypeInfo.LOCAL_DATE_TIME, 93);
           m.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, 3);
           m.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, -2);
           TYPE_MAPPING = Collections.unmodifiableMap(m);
           HashMap<Integer, String> names = new HashMap();
           names.put(12, "VARCHAR");
           names.put(16, "BOOLEAN");
           names.put(-6, "TINYINT");
           names.put(5, "SMALLINT");
           names.put(4, "INTEGER");
           names.put(-5, "BIGINT");
           names.put(6, "FLOAT");
           names.put(8, "DOUBLE");
           names.put(1, "CHAR");
           names.put(91, "DATE");
           names.put(92, "TIME");
           names.put(93, "TIMESTAMP");
           names.put(3, "DECIMAL");
           names.put(-2, "BINARY");
           SQL_TYPE_NAMES = Collections.unmodifiableMap(names);
       }
   }
   ```
   
   ### SeaTunnel Version
   
   dev branch
   
   ### SeaTunnel Config
   
   ```conf
   not needed
   ```
   
   
   ### Running Command
   
   ```shell
   not needed
   ```
   
   
   ### Error Exception
   
   ```log
   not needed
   ```
   
   
   ### Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to