This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2d1db66b9f [Fix][Doris] Fix catalog not closed (#8415)
2d1db66b9f is described below
commit 2d1db66b9f02011a2e8d7c645b799a46b668a4a7
Author: hailin0 <[email protected]>
AuthorDate: Thu Jan 2 09:07:51 2025 +0800
[Fix][Doris] Fix catalog not closed (#8415)
---
.../doris/source/DorisSourceFactory.java | 62 +++++++++++-----------
1 file changed, 32 insertions(+), 30 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 506a7c97dc..05f3e408ed 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -83,39 +83,41 @@ public class DorisSourceFactory implements
TableSourceFactory {
DorisSourceConfig dorisSourceConfig =
DorisSourceConfig.of(context.getOptions());
List<DorisTableConfig> dorisTableConfigList =
dorisSourceConfig.getTableConfigList();
Map<TablePath, DorisSourceTable> dorisSourceTables = new HashMap<>();
- for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
- CatalogTable table;
- DorisCatalogFactory dorisCatalogFactory = new
DorisCatalogFactory();
- DorisCatalog catalog =
- (DorisCatalog) dorisCatalogFactory.createCatalog("doris",
context.getOptions());
+
+ DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory();
+ try (DorisCatalog catalog =
+ (DorisCatalog) dorisCatalogFactory.createCatalog("doris",
context.getOptions())) {
catalog.open();
- TablePath tablePath =
TablePath.of(dorisTableConfig.getTableIdentifier());
- String readFields = dorisTableConfig.getReadField();
- try {
- List<String> readFiledList = null;
- if (StringUtils.isNotBlank(readFields)) {
- readFiledList =
- Arrays.stream(readFields.split(","))
- .map(String::trim)
- .collect(Collectors.toList());
- }
+ for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
+ CatalogTable table;
+ TablePath tablePath =
TablePath.of(dorisTableConfig.getTableIdentifier());
+ String readFields = dorisTableConfig.getReadField();
+ try {
+ List<String> readFiledList = null;
+ if (StringUtils.isNotBlank(readFields)) {
+ readFiledList =
+ Arrays.stream(readFields.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
- table = catalog.getTable(tablePath, readFiledList);
- } catch (Exception e) {
- log.error("create source error");
- throw e;
+ table = catalog.getTable(tablePath, readFiledList);
+ } catch (Exception e) {
+ log.error("create source error");
+ throw e;
+ }
+ dorisSourceTables.put(
+ tablePath,
+ DorisSourceTable.builder()
+ .catalogTable(table)
+ .tablePath(tablePath)
+ .readField(readFields)
+ .filterQuery(dorisTableConfig.getFilterQuery())
+ .batchSize(dorisTableConfig.getBatchSize())
+ .tabletSize(dorisTableConfig.getTabletSize())
+
.execMemLimit(dorisTableConfig.getExecMemLimit())
+ .build());
}
- dorisSourceTables.put(
- tablePath,
- DorisSourceTable.builder()
- .catalogTable(table)
- .tablePath(tablePath)
- .readField(readFields)
- .filterQuery(dorisTableConfig.getFilterQuery())
- .batchSize(dorisTableConfig.getBatchSize())
- .tabletSize(dorisTableConfig.getTabletSize())
- .execMemLimit(dorisTableConfig.getExecMemLimit())
- .build());
}
return () ->
(SeaTunnelSource<T, SplitT, StateT>)