Hisoka-X commented on code in PR #5581:
URL: https://github.com/apache/seatunnel/pull/5581#discussion_r1366574099
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -61,91 +53,48 @@ public class JdbcSource
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
private JdbcSourceConfig jdbcSourceConfig;
- private SeaTunnelRowType typeInfo;
-
- private JdbcDialect jdbcDialect;
- private JdbcInputFormat inputFormat;
- private PartitionParameter partitionParameter;
- private JdbcConnectionProvider jdbcConnectionProvider;
-
- private String query;
+ private Map<TablePath, JdbcSourceTable> jdbcSourceTables;
- public JdbcSource(
- JdbcSourceConfig jdbcSourceConfig,
- SeaTunnelRowType typeInfo,
- JdbcDialect jdbcDialect,
- JdbcInputFormat inputFormat,
- PartitionParameter partitionParameter,
- JdbcConnectionProvider jdbcConnectionProvider,
- String query) {
+ @SneakyThrows
+ public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
- this.typeInfo = typeInfo;
- this.jdbcDialect = jdbcDialect;
- this.inputFormat = inputFormat;
- this.partitionParameter = partitionParameter;
- this.jdbcConnectionProvider = jdbcConnectionProvider;
- this.query = query;
+ this.jdbcSourceTables =
+ JdbcCatalogUtils.getTables(
+ jdbcSourceConfig.getJdbcConnectionConfig(),
+ jdbcSourceConfig.getTableConfigList());
}
@Override
public String getPluginName() {
return "Jdbc";
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
- ConfigValidator.of(config).validate(new
JdbcSourceFactory().optionRule());
- this.jdbcSourceConfig = JdbcSourceConfig.of(config);
- this.jdbcDialect =
- JdbcDialectLoader.load(
- jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
-
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
- this.jdbcDialect.connectionUrlParse(
- jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
- jdbcSourceConfig.getJdbcConnectionConfig().getProperties(),
- this.jdbcDialect.defaultParameter());
- this.jdbcConnectionProvider =
- new
SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
- this.query = jdbcSourceConfig.getQuery();
- try (Connection connection =
jdbcConnectionProvider.getOrEstablishConnection()) {
- this.typeInfo = initTableField(connection);
- this.partitionParameter =
-
createPartitionParameter(jdbcConnectionProvider.getOrEstablishConnection());
- } catch (Exception e) {
- throw new PrepareFailException("jdbc", PluginType.SOURCE,
e.toString());
- }
-
- if (partitionParameter != null) {
- this.query =
- JdbcSourceFactory.obtainPartitionSql(
- jdbcDialect, partitionParameter,
jdbcSourceConfig.getQuery());
- }
-
- this.inputFormat =
- new JdbcInputFormat(
- jdbcConnectionProvider,
- jdbcDialect,
- typeInfo,
- query,
- jdbcSourceConfig.getFetchSize(),
-
jdbcSourceConfig.getJdbcConnectionConfig().isAutoCommit());
- }
-
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
Review Comment:
Remove `getProducedType` should waiting #5670
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]