kalencaya opened a new issue #1428: URL: https://github.com/apache/incubator-seatunnel/issues/1428
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened Seatunnel implements data integration through by source -> transforms -> sink principles, and any flink plugin supports `result_table_name` or `source_table_name` option for data transformation between DataStream and DataSet by Table api. Below is `TableUtil` source code, which connect source, transfrom, sink. ``` public final class TableUtil { private TableUtil() { } public static DataStream<Row> tableToDataStream(StreamTableEnvironment tableEnvironment, Table table, boolean isAppend) { TypeInformation<Row> typeInfo = table.getSchema().toRowType(); if (isAppend) { return tableEnvironment.toAppendStream(table, typeInfo); } return tableEnvironment .toRetractStream(table, typeInfo) .filter(row -> row.f0) .map(row -> row.f1) .returns(typeInfo); } public static DataSet<Row> tableToDataSet(BatchTableEnvironment tableEnvironment, Table table) { return tableEnvironment.toDataSet(table, table.getSchema().toRowType()); } public static void dataStreamToTable(StreamTableEnvironment tableEnvironment, String tableName, DataStream<Row> dataStream) { tableEnvironment.registerDataStream(tableName, dataStream); } public static void dataSetToTable(BatchTableEnvironment tableEnvironment, String tableName, DataSet<Row> dataSet) { tableEnvironment.registerDataSet(tableName, dataSet); } public static boolean tableExists(TableEnvironment tableEnvironment, String name) { String currentCatalog = tableEnvironment.getCurrentCatalog(); Catalog catalog = tableEnvironment.getCatalog(currentCatalog).get(); ObjectPath objectPath = new ObjectPath(tableEnvironment.getCurrentDatabase(), name); return catalog.tableExists(objectPath); } } ``` The problem is that when DataStream or DataSet contains some TypeInfomation which Table api not supports, then exception appears. On flink jdbc plugin case, Flink only supports a few type but different RDBMS has own sql dialect and `JdbcSource#informationMapping` has to provide mapping from jdbc type to TypeInformation. If we add some TypeInformation not supported by `JdbcTypeUtil#TYPE_MAPPING` and `JdbcTypeUtil#SQL_TYPE_NAMES`, then transformation between DataSet or DataStream between Table may failed by exception. ``` public class JdbcTypeUtil { private static final Map<TypeInformation<?>, Integer> TYPE_MAPPING; private static final Map<Integer, String> SQL_TYPE_NAMES; public static int typeInformationToSqlType(TypeInformation<?> type) { if (TYPE_MAPPING.containsKey(type)) { return (Integer)TYPE_MAPPING.get(type); } else if (!(type instanceof ObjectArrayTypeInfo) && !(type instanceof PrimitiveArrayTypeInfo)) { throw new IllegalArgumentException("Unsupported type: " + type); } else { return 2003; } } static { HashMap<TypeInformation<?>, Integer> m = new HashMap(); m.put(BasicTypeInfo.STRING_TYPE_INFO, 12); m.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, 16); m.put(BasicTypeInfo.BYTE_TYPE_INFO, -6); m.put(BasicTypeInfo.SHORT_TYPE_INFO, 5); m.put(BasicTypeInfo.INT_TYPE_INFO, 4); m.put(BasicTypeInfo.LONG_TYPE_INFO, -5); m.put(BasicTypeInfo.FLOAT_TYPE_INFO, 7); m.put(BasicTypeInfo.DOUBLE_TYPE_INFO, 8); m.put(SqlTimeTypeInfo.DATE, 91); m.put(SqlTimeTypeInfo.TIME, 92); m.put(SqlTimeTypeInfo.TIMESTAMP, 93); m.put(LocalTimeTypeInfo.LOCAL_DATE, 91); m.put(LocalTimeTypeInfo.LOCAL_TIME, 92); m.put(LocalTimeTypeInfo.LOCAL_DATE_TIME, 93); m.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, 3); m.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, -2); TYPE_MAPPING = Collections.unmodifiableMap(m); HashMap<Integer, String> names = new HashMap(); names.put(12, "VARCHAR"); names.put(16, "BOOLEAN"); names.put(-6, "TINYINT"); names.put(5, "SMALLINT"); names.put(4, "INTEGER"); names.put(-5, "BIGINT"); names.put(6, "FLOAT"); names.put(8, "DOUBLE"); names.put(1, "CHAR"); names.put(91, "DATE"); names.put(92, "TIME"); names.put(93, "TIMESTAMP"); names.put(3, "DECIMAL"); names.put(-2, "BINARY"); SQL_TYPE_NAMES = Collections.unmodifiableMap(names); } } ``` ### SeaTunnel Version dev branch ### SeaTunnel Config ```conf not needed ``` ### Running Command ```shell not needed ``` ### Error Exception ```log not needed ``` ### Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
