This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new c09a1588 [Feature]Support ES query index parameters (#254)
c09a1588 is described below
commit c09a1588cf9fbefc5eab3df41b5082e1aa4d4251
Author: Jast <[email protected]>
AuthorDate: Tue Jan 7 13:50:06 2025 +0800
[Feature]Support ES query index parameters (#254)
---
.../ElasticSearchDataSourceChannel.java | 3 +-
.../plugin/elasticsearch/client/EsRestClient.java | 60 ++++++++++++++--------
2 files changed, 41 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
index 77c12f29..29c84720 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
@@ -60,9 +60,10 @@ public class ElasticSearchDataSourceChannel implements
DataSourceChannel {
String database,
Map<String, String> option) {
databaseCheck(database);
+
try (EsRestClient client =
EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
- return client.listIndex();
+ return client.listIndex(option.get("filterName"));
}
}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
index 01d20bb2..64cb1002 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
@@ -252,27 +253,7 @@ public class EsRestClient implements AutoCloseable {
}
public List<String> listIndex() {
- String endpoint = "/_cat/indices?format=json";
- Request request = new Request("GET", endpoint);
- try {
- Response response = restClient.performRequest(request);
- if (response == null) {
- throw new ResponseException("GET " + endpoint + " response
null");
- }
- if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
- String entity = EntityUtils.toString(response.getEntity());
- return JsonUtils.toList(entity, Map.class).stream()
- .map(map -> map.get("index").toString())
- .collect(Collectors.toList());
- } else {
- throw new ResponseException(
- String.format(
- "GET %s response status code=%d",
- endpoint,
response.getStatusLine().getStatusCode()));
- }
- } catch (IOException ex) {
- throw new ResponseException(ex);
- }
+ return this.listIndex(null);
}
public void dropIndex(String tableName) {
@@ -365,4 +346,41 @@ public class EsRestClient implements AutoCloseable {
}
return mapping;
}
+
+ public List<String> listIndex(String filterName) {
+ String endpoint = "/_cat/indices?format=json";
+ Request request = new Request("GET", endpoint);
+ try {
+ Response response = restClient.performRequest(request);
+ if (response == null) {
+ throw new ResponseException("GET " + endpoint + " response
null");
+ }
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String entity = EntityUtils.toString(response.getEntity());
+ List<String> indices =
+ JsonUtils.toList(entity, Map.class).stream()
+ .map(map -> map.get("index").toString())
+ .collect(Collectors.toList());
+
+ if (StringUtils.isNotEmpty(filterName)) {
+ indices =
+ indices.stream()
+ .filter(
+ index ->
+ index.toLowerCase()
+
.contains(filterName.toLowerCase()))
+ .collect(Collectors.toList());
+ }
+
+ return indices;
+ } else {
+ throw new ResponseException(
+ String.format(
+ "GET %s response status code=%d",
+ endpoint,
response.getStatusLine().getStatusCode()));
+ }
+ } catch (IOException ex) {
+ throw new ResponseException(ex);
+ }
+ }
}