This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2d1db66b9f [Fix][Doris] Fix catalog not closed (#8415)
2d1db66b9f is described below

commit 2d1db66b9f02011a2e8d7c645b799a46b668a4a7
Author: hailin0 <[email protected]>
AuthorDate: Thu Jan 2 09:07:51 2025 +0800

    [Fix][Doris] Fix catalog not closed (#8415)
---
 .../doris/source/DorisSourceFactory.java           | 62 +++++++++++-----------
 1 file changed, 32 insertions(+), 30 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 506a7c97dc..05f3e408ed 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -83,39 +83,41 @@ public class DorisSourceFactory implements 
TableSourceFactory {
         DorisSourceConfig dorisSourceConfig = 
DorisSourceConfig.of(context.getOptions());
         List<DorisTableConfig> dorisTableConfigList = 
dorisSourceConfig.getTableConfigList();
         Map<TablePath, DorisSourceTable> dorisSourceTables = new HashMap<>();
-        for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
-            CatalogTable table;
-            DorisCatalogFactory dorisCatalogFactory = new 
DorisCatalogFactory();
-            DorisCatalog catalog =
-                    (DorisCatalog) dorisCatalogFactory.createCatalog("doris", 
context.getOptions());
+
+        DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory();
+        try (DorisCatalog catalog =
+                (DorisCatalog) dorisCatalogFactory.createCatalog("doris", 
context.getOptions())) {
             catalog.open();
-            TablePath tablePath = 
TablePath.of(dorisTableConfig.getTableIdentifier());
-            String readFields = dorisTableConfig.getReadField();
-            try {
-                List<String> readFiledList = null;
-                if (StringUtils.isNotBlank(readFields)) {
-                    readFiledList =
-                            Arrays.stream(readFields.split(","))
-                                    .map(String::trim)
-                                    .collect(Collectors.toList());
-                }
+            for (DorisTableConfig dorisTableConfig : dorisTableConfigList) {
+                CatalogTable table;
+                TablePath tablePath = 
TablePath.of(dorisTableConfig.getTableIdentifier());
+                String readFields = dorisTableConfig.getReadField();
+                try {
+                    List<String> readFiledList = null;
+                    if (StringUtils.isNotBlank(readFields)) {
+                        readFiledList =
+                                Arrays.stream(readFields.split(","))
+                                        .map(String::trim)
+                                        .collect(Collectors.toList());
+                    }
 
-                table = catalog.getTable(tablePath, readFiledList);
-            } catch (Exception e) {
-                log.error("create source error");
-                throw e;
+                    table = catalog.getTable(tablePath, readFiledList);
+                } catch (Exception e) {
+                    log.error("create source error");
+                    throw e;
+                }
+                dorisSourceTables.put(
+                        tablePath,
+                        DorisSourceTable.builder()
+                                .catalogTable(table)
+                                .tablePath(tablePath)
+                                .readField(readFields)
+                                .filterQuery(dorisTableConfig.getFilterQuery())
+                                .batchSize(dorisTableConfig.getBatchSize())
+                                .tabletSize(dorisTableConfig.getTabletSize())
+                                
.execMemLimit(dorisTableConfig.getExecMemLimit())
+                                .build());
             }
-            dorisSourceTables.put(
-                    tablePath,
-                    DorisSourceTable.builder()
-                            .catalogTable(table)
-                            .tablePath(tablePath)
-                            .readField(readFields)
-                            .filterQuery(dorisTableConfig.getFilterQuery())
-                            .batchSize(dorisTableConfig.getBatchSize())
-                            .tabletSize(dorisTableConfig.getTabletSize())
-                            .execMemLimit(dorisTableConfig.getExecMemLimit())
-                            .build());
         }
         return () ->
                 (SeaTunnelSource<T, SplitT, StateT>)

Reply via email to