This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1eddb8e1b1 [Improve][Connector-V2] Optimize milvus code (#7691)
1eddb8e1b1 is described below
commit 1eddb8e1b13a9cd42fc2fc7987e05a5c196c6489
Author: corgy-w <[email protected]>
AuthorDate: Sat Sep 21 22:16:29 2024 +0800
[Improve][Connector-V2] Optimize milvus code (#7691)
---
.../milvus/convert/MilvusConvertUtils.java | 73 ++++++++++++----------
1 file changed, 41 insertions(+), 32 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
index 0e7a898b23..6502707795 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/convert/MilvusConvertUtils.java
@@ -80,43 +80,52 @@ public class MilvusConvertUtils {
private static final Gson gson = new Gson();
public static Map<TablePath, CatalogTable> getSourceTables(ReadonlyConfig
config) {
- MilvusServiceClient client =
- new MilvusServiceClient(
- ConnectParam.newBuilder()
- .withUri(config.get(MilvusSourceConfig.URL))
-
.withToken(config.get(MilvusSourceConfig.TOKEN))
- .build());
-
- String database = config.get(MilvusSourceConfig.DATABASE);
- List<String> collectionList = new ArrayList<>();
- if (StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION)))
{
- collectionList.add(config.get(MilvusSourceConfig.COLLECTION));
- } else {
- R<ShowCollectionsResponse> response =
- client.showCollections(
- ShowCollectionsParam.newBuilder()
- .withDatabaseName(database)
- .withShowType(ShowType.All)
+ MilvusServiceClient client = null;
+ try {
+ client =
+ new MilvusServiceClient(
+ ConnectParam.newBuilder()
+
.withUri(config.get(MilvusSourceConfig.URL))
+
.withToken(config.get(MilvusSourceConfig.TOKEN))
.build());
- if (response.getStatus() != R.Status.Success.getCode()) {
- throw new MilvusConnectorException(
- MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR);
- }
- ProtocolStringList collections =
response.getData().getCollectionNamesList();
- if (CollectionUtils.isEmpty(collections)) {
- throw new MilvusConnectorException(
- MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS,
database);
+ String database = config.get(MilvusSourceConfig.DATABASE);
+ List<String> collectionList = new ArrayList<>();
+ if
(StringUtils.isNotEmpty(config.get(MilvusSourceConfig.COLLECTION))) {
+ collectionList.add(config.get(MilvusSourceConfig.COLLECTION));
+ } else {
+ R<ShowCollectionsResponse> response =
+ client.showCollections(
+ ShowCollectionsParam.newBuilder()
+ .withDatabaseName(database)
+ .withShowType(ShowType.All)
+ .build());
+ if (response.getStatus() != R.Status.Success.getCode()) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.SHOW_COLLECTIONS_ERROR);
+ }
+
+ ProtocolStringList collections =
response.getData().getCollectionNamesList();
+ if (CollectionUtils.isEmpty(collections)) {
+ throw new MilvusConnectorException(
+ MilvusConnectionErrorCode.DATABASE_NO_COLLECTIONS,
database);
+ }
+ collectionList.addAll(collections);
}
- collectionList.addAll(collections);
- }
- Map<TablePath, CatalogTable> map = new HashMap<>();
- for (String collection : collectionList) {
- CatalogTable catalogTable = getCatalogTable(client, database,
collection);
- map.put(TablePath.of(database, collection), catalogTable);
+ Map<TablePath, CatalogTable> map = new HashMap<>();
+ for (String collection : collectionList) {
+ CatalogTable catalogTable = getCatalogTable(client, database,
collection);
+ map.put(TablePath.of(database, collection), catalogTable);
+ }
+ return map;
+ } catch (Exception e) {
+ throw new CatalogException(e.getMessage(), e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
}
- return map;
}
public static CatalogTable getCatalogTable(