This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 9e0d6748 [Hotfix] Fix convert virtual table of kafka to table field
error (#101)
9e0d6748 is described below
commit 9e0d6748ceee6a2a9a788acaeccbbf8c0ac532ca
Author: Kim <[email protected]>
AuthorDate: Thu Aug 24 14:12:58 2023 +0800
[Hotfix] Fix convert virtual table of kafka to table field error (#101)
Co-authored-by: zhujinming <[email protected]>
---
.../seatunnel/app/service/ITableSchemaService.java | 3 +-
.../app/service/impl/DatasourceServiceImpl.java | 4 +--
.../app/service/impl/TableSchemaServiceImpl.java | 35 +++++++++++++---------
3 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITableSchemaService.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITableSchemaService.java
index 21c1cc1d..5d476d6c 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITableSchemaService.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/ITableSchemaService.java
@@ -27,7 +27,8 @@ import java.util.List;
public interface ITableSchemaService {
TableSchemaRes getSeaTunnelSchema(String pluginName, TableSchemaReq
tableSchemaReq);
- void getAddSeaTunnelSchema(List<TableField> tableFields, String
pluginName);
+ void getAddSeaTunnelSchema(
+ List<TableField> tableFields, String pluginName, Boolean
isVirtualTable);
boolean getColumnProjection(String pluginName);
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
index 6f4cfd60..04ef5a93 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/DatasourceServiceImpl.java
@@ -341,7 +341,7 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
DataSourceClientFactory.getDataSourceClient()
.getTableFields(pluginName, datasourceConfig,
databaseName, tableName);
- tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName);
+ tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName,
false);
return tableFields;
}
VirtualTable virtualTable =
virtualTableDao.selectVirtualTableByTableName(tableName);
@@ -352,7 +352,7 @@ public class DatasourceServiceImpl extends
SeatunnelBaseServiceImpl
// convert virtual table to table field
// virtualTable.getTableFields()
List<TableField> tableFields =
convertTableSchema(virtualTable.getTableFields());
- tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName);
+ tableSchemaService.getAddSeaTunnelSchema(tableFields, pluginName,
true);
return tableFields;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
index fd3ec806..554651c6 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/TableSchemaServiceImpl.java
@@ -104,7 +104,8 @@ public class TableSchemaServiceImpl extends
SeatunnelBaseServiceImpl
}
@Override
- public void getAddSeaTunnelSchema(List<TableField> tableFields, String
pluginName) {
+ public void getAddSeaTunnelSchema(
+ List<TableField> tableFields, String pluginName, Boolean
isVirtualTable) {
pluginName = pluginName.toUpperCase();
if (pluginName.endsWith("-CDC")) {
pluginName = pluginName.replace("-CDC", "");
@@ -113,19 +114,25 @@ public class TableSchemaServiceImpl extends
SeatunnelBaseServiceImpl
} else if (pluginName.startsWith("JDBC-")) {
pluginName = pluginName.replace("JDBC-", "");
}
- DataTypeConvertor<?> convertor =
factory.getDataTypeConvertor(pluginName);
- for (TableField field : tableFields) {
- try {
- SeaTunnelDataType<?> dataType =
convertor.toSeaTunnelType(field.getType());
- field.setUnSupport(false);
- field.setOutputDataType(dataType.toString());
- } catch (Exception exception) {
- field.setUnSupport(true);
- log.warn(
- "Database {} , field {} is unSupport",
- pluginName,
- field.getType(),
- exception);
+ try {
+ DataTypeConvertor<?> convertor =
factory.getDataTypeConvertor(pluginName);
+ for (TableField field : tableFields) {
+ try {
+ SeaTunnelDataType<?> dataType =
convertor.toSeaTunnelType(field.getType());
+ field.setUnSupport(false);
+ field.setOutputDataType(dataType.toString());
+ } catch (Exception exception) {
+ field.setUnSupport(true);
+ log.warn(
+ "Database {} , field {} is unSupport",
+ pluginName,
+ field.getType(),
+ exception);
+ }
+ }
+ } catch (Exception e) {
+ if (!isVirtualTable) {
+ throw e;
}
}
}