This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new c09a1588 [Feature]Support ES query index parameters (#254)
c09a1588 is described below

commit c09a1588cf9fbefc5eab3df41b5082e1aa4d4251
Author: Jast <[email protected]>
AuthorDate: Tue Jan 7 13:50:06 2025 +0800

    [Feature]Support ES query index parameters (#254)
---
 .../ElasticSearchDataSourceChannel.java            |  3 +-
 .../plugin/elasticsearch/client/EsRestClient.java  | 60 ++++++++++++++--------
 2 files changed, 41 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
index 77c12f29..29c84720 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java
@@ -60,9 +60,10 @@ public class ElasticSearchDataSourceChannel implements 
DataSourceChannel {
             String database,
             Map<String, String> option) {
         databaseCheck(database);
+
         try (EsRestClient client =
                 
EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) {
-            return client.listIndex();
+            return client.listIndex(option.get("filterName"));
         }
     }
 
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
index 01d20bb2..64cb1002 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
 import org.apache.http.auth.AuthScope;
@@ -252,27 +253,7 @@ public class EsRestClient implements AutoCloseable {
     }
 
     public List<String> listIndex() {
-        String endpoint = "/_cat/indices?format=json";
-        Request request = new Request("GET", endpoint);
-        try {
-            Response response = restClient.performRequest(request);
-            if (response == null) {
-                throw new ResponseException("GET " + endpoint + " response 
null");
-            }
-            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
-                String entity = EntityUtils.toString(response.getEntity());
-                return JsonUtils.toList(entity, Map.class).stream()
-                        .map(map -> map.get("index").toString())
-                        .collect(Collectors.toList());
-            } else {
-                throw new ResponseException(
-                        String.format(
-                                "GET %s response status code=%d",
-                                endpoint, 
response.getStatusLine().getStatusCode()));
-            }
-        } catch (IOException ex) {
-            throw new ResponseException(ex);
-        }
+        return this.listIndex(null);
     }
 
     public void dropIndex(String tableName) {
@@ -365,4 +346,41 @@ public class EsRestClient implements AutoCloseable {
         }
         return mapping;
     }
+
+    public List<String> listIndex(String filterName) {
+        String endpoint = "/_cat/indices?format=json";
+        Request request = new Request("GET", endpoint);
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new ResponseException("GET " + endpoint + " response 
null");
+            }
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                String entity = EntityUtils.toString(response.getEntity());
+                List<String> indices =
+                        JsonUtils.toList(entity, Map.class).stream()
+                                .map(map -> map.get("index").toString())
+                                .collect(Collectors.toList());
+
+                if (StringUtils.isNotEmpty(filterName)) {
+                    indices =
+                            indices.stream()
+                                    .filter(
+                                            index ->
+                                                    index.toLowerCase()
+                                                            
.contains(filterName.toLowerCase()))
+                                    .collect(Collectors.toList());
+                }
+
+                return indices;
+            } else {
+                throw new ResponseException(
+                        String.format(
+                                "GET %s response status code=%d",
+                                endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new ResponseException(ex);
+        }
+    }
 }

Reply via email to