FuYouJ commented on code in PR #7502:
URL: https://github.com/apache/seatunnel/pull/7502#discussion_r1739630810
##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java:
##########
@@ -58,28 +59,51 @@ public class ElasticsearchSource
SupportParallelism,
SupportColumnProjection {
- private final ReadonlyConfig config;
+ private final List<SourceConfig> sourceConfigList;
+ private final ReadonlyConfig connectionConfig;
- private CatalogTable catalogTable;
+ public ElasticsearchSource(ReadonlyConfig config) {
+ this.connectionConfig = config;
+ if (config.getOptional(SourceConfig.INDEX_LIST).isPresent()) {
+ this.sourceConfigList = createMultiSource(config);
+ } else {
+ this.sourceConfigList =
Collections.singletonList(parseOneIndexQueryConfig(config));
+ }
+ }
- private List<String> source;
+ private List<SourceConfig> createMultiSource(ReadonlyConfig config) {
+ List<Map<String, Object>> configMaps =
config.get(SourceConfig.INDEX_LIST);
+ List<ReadonlyConfig> configList =
+
configMaps.stream().map(ReadonlyConfig::fromMap).collect(Collectors.toList());
+ List<SourceConfig> sourceConfigList = new
ArrayList<>(configList.size());
+ for (ReadonlyConfig readonlyConfig : configList) {
+ SourceConfig sourceConfig =
parseOneIndexQueryConfig(readonlyConfig);
+ sourceConfigList.add(sourceConfig);
+ }
+ return sourceConfigList;
+ }
- private Map<String, String> arrayColumn;
+ private SourceConfig parseOneIndexQueryConfig(ReadonlyConfig
readonlyConfig) {
- public ElasticsearchSource(ReadonlyConfig config) {
- this.config = config;
- if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+ Map<String, Object> query = readonlyConfig.get(SourceConfig.QUERY);
+ String index = readonlyConfig.get(SourceConfig.INDEX);
+
+ CatalogTable catalogTable;
+ List<String> source;
+ Map<String, String> arrayColumn;
+
+ if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent())
{
// todo: We need to remove the schema in ES.
log.warn(
- "The schema config in ElasticSearch sink is deprecated,
please use source config instead!");
- catalogTable = CatalogTableUtil.buildWithConfig(config);
+ "The schema config in ElasticSearch source/sink is
deprecated, please use source config instead!");
+ catalogTable = CatalogTableUtil.buildWithConfig(readonlyConfig);
source =
Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
- source = config.get(SourceConfig.SOURCE);
- arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
- EsRestClient esRestClient = EsRestClient.createInstance(config);
+ source = readonlyConfig.get(SourceConfig.SOURCE);
+ arrayColumn = readonlyConfig.get(SourceConfig.ARRAY_COLUMN);
+ EsRestClient esRestClient =
EsRestClient.createInstance(connectionConfig);
Map<String, BasicTypeDefine<EsType>> esFieldType =
-
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
+ esRestClient.getFieldTypeMapping(index, source);
esRestClient.close();
Review Comment:
I implemented Closeable for EsRestClient and extracted the code into a
separate method to ensure that resources are closed even if an exception occurs
```
private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
String index, List<String> source) {
// EsRestClient#getFieldTypeMapping may throw runtime exception
// so here we use try-resources-finally to close the resource
try (EsRestClient esRestClient =
EsRestClient.createInstance(connectionConfig)) {
return esRestClient.getFieldTypeMapping(index, source);
}
}
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java:
##########
@@ -394,6 +434,12 @@ private List<String>
getDocsWithTransformTimestamp(List<String> source, String i
}
private List<String> getDocsWithTransformDate(List<String> source, String
index) {
+ return getDocsWithTransformDate(source, index,
Collections.emptyList());
+ }
+
+ //
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]