This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f84ad372a4 [INLONG-8676][Manager] Elasticsearch - Modify the calling 
method from SDK to HTTP  (#9057)
f84ad372a4 is described below

commit f84ad372a49aa3ce1eb8d1f3850201e8c57278bc
Author: haibo.duan <[email protected]>
AuthorDate: Fri Mar 15 15:01:18 2024 +0800

    [INLONG-8676][Manager] Elasticsearch - Modify the calling method from SDK 
to HTTP  (#9057)
---
 .../inlong/manager/common/util/HttpUtils.java      |  18 ++
 .../es/ElasticsearchAggregationsTermsInfo.java     |  71 +++++
 .../pojo/node/es/ElasticsearchQueryInfo.java       |  91 +++++++
 .../pojo/node/es/ElasticsearchQuerySortInfo.java   |  50 ++++
 .../manager/pojo/node/es/ElasticsearchRequest.java |  57 ++++
 .../sink/es/ElasticsearchCreateIndexResponse.java  |  32 +++
 .../sink/es/ElasticsearchIndexMappingInfo.java     |  84 ++++++
 inlong-manager/manager-service/pom.xml             |  40 +--
 .../service/core/impl/AgentServiceImpl.java        |   2 +-
 .../service/core/impl/AuditServiceImpl.java        | 140 +++++++---
 .../service/group/AbstractGroupOperator.java       |   2 +-
 .../node/es/ElasticsearchDataNodeOperator.java     |   3 +-
 .../service/resource/sink/es/ElasticsearchApi.java | 289 ++++++++++++---------
 .../resource/sink/es/ElasticsearchConfig.java      | 106 +++++---
 .../manager/service/sink/ElasticsearchApiTest.java | 248 ++++++++++++++++++
 15 files changed, 996 insertions(+), 237 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 5f5d492da6..85a971e706 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -185,6 +185,24 @@ public class HttpUtils {
         }
     }
 
+    /**
+     * Send HEAD request to the specified URL.
+     */
+    public static boolean headRequest(RestTemplate restTemplate, String url, 
Map<String, Object> params,
+            HttpHeaders header) {
+        ResponseEntity<String> exchange;
+        boolean result = false;
+        HttpEntity<String> request = new HttpEntity(params, header);
+        log.debug("send request to {}, param {}", url, params);
+        exchange = restTemplate.exchange(url, HttpMethod.HEAD, request, 
String.class);
+        HttpStatus statusCode = exchange.getStatusCode();
+        if (statusCode.is2xxSuccessful()) {
+            result = statusCode.is2xxSuccessful();
+        }
+        log.debug("success request to {},  status code {}, body {}", url, 
statusCode, exchange.getBody());
+        return result;
+    }
+
     /**
      * Send GET request to the specified URL.
      */
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
new file mode 100644
index 0000000000..f5acc844d7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchAggregationsTermsInfo {
+
+    /**
+     *  Aggregations terms field.
+     */
+    private String field;
+
+    /**
+     * Aggregations size.
+     */
+    private int size;
+
+    /**
+     * Aggregations info.
+     */
+    private Map<String, Sum> aggregations;
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class Sum {
+
+        /**
+         * Aggregations sum field.
+         */
+        private Field sum;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class Field {
+
+        /**
+         * Aggregations field value.
+         */
+        private String field;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
new file mode 100644
index 0000000000..d97cae54b1
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchQueryInfo {
+
+    /**
+     *  Bool query value.
+     */
+    private QueryBool bool;
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class QueryBool {
+
+        /**
+         * Must query value.
+         */
+        private List<QueryTerm> must;
+
+        /**
+         *  Use the boost parameter to boost search results.
+         */
+        private double boost;
+
+        /**
+         * adjust pure negative.
+         */
+        @SerializedName("adjust_pure_negative")
+        private boolean adjustPureNegative;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class QueryTerm {
+
+        /**
+         * Term query value.
+         */
+        private Map<String, TermValue> term;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class TermValue {
+
+        /**
+         * Term query value.
+         */
+        private String value;
+
+        /**
+         *  Use the boost parameter to boost search results.
+         */
+        private double boost;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
new file mode 100644
index 0000000000..b34f822ed4
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchQuerySortInfo {
+
+    /**
+     * Query sort list.
+     */
+    private List<Map<String, SortValue>> sort;
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class SortValue {
+
+        /**
+         *  Sort value.
+         */
+        private String order;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
new file mode 100644
index 0000000000..42fc348757
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchRequest {
+
+    /**
+     * From index to start the search from. Defaults to 0.
+     */
+    private int from;
+
+    /**
+     * The number of search hits to return. Defaults to 10.
+     */
+    private int size;
+
+    /**
+     * Sets the search query for this request.
+     */
+    private ElasticsearchQueryInfo query;
+
+    /**
+     * Adds a sort against the given field name and the sort ordering.
+     */
+    private ElasticsearchQuerySortInfo sort;
+
+    /**
+     * Add an aggregation to perform as part of the search.
+     */
+    private Map<String, Map<String, ElasticsearchAggregationsTermsInfo>> 
aggregations;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
new file mode 100644
index 0000000000..38cb17eba7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchCreateIndexResponse {
+
+    private String acknowledged;
+    private String shardsAcknowledged;
+    private String index;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
new file mode 100644
index 0000000000..f66986b956
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.es;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchIndexMappingInfo {
+
+    /**
+     *  mapping.
+     */
+    private IndexMappings mappings;
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class IndexMappings {
+
+        /**
+         * properties.
+         */
+        private Map<String, IndexField> properties;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class IndexField {
+
+        /**
+         *  Index field type.
+         */
+        private String type;
+
+        /**
+         * Index text type field analyzer.
+         */
+        private String analyzer;
+
+        /**
+         * Index text type field search analyzer.
+         */
+        @SerializedName("search_analyzer")
+        private String searchAnalyzer;
+
+        /**
+         * Index date type field search format.
+         */
+        private String format;
+
+        /**
+         * Index scaled_float type scaling factor.
+         */
+        @SerializedName("scaling_factor")
+        private String scalingFactor;
+    }
+}
diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index 2a503a4a90..1a0d975776 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -225,38 +225,6 @@
             <artifactId>hive-standalone-metastore</artifactId>
             <version>${hive3x.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.yaml</groupId>
-                    <artifactId>snakeyaml</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.carrotsearch</groupId>
-                    <artifactId>hppc</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-client-sniffer</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
 
         <dependency>
             <groupId>com.fasterxml.jackson.dataformat</groupId>
@@ -507,12 +475,20 @@
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>pulsar</artifactId>
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3b1741deb3..953df33e96 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -65,13 +65,13 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
 import lombok.Getter;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.common.util.set.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index a2163ce9ac..281dafe170 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -43,6 +43,14 @@ import org.apache.inlong.manager.pojo.audit.AuditRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
 import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
 import org.apache.inlong.manager.pojo.audit.AuditVO;
+import 
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo;
+import 
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Field;
+import 
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Sum;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo.QueryBool;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo;
+import 
org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortValue;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
 import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
@@ -51,19 +59,15 @@ import org.apache.inlong.manager.service.core.AuditService;
 import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
 import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ibatis.jdbc.SQL;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
-import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.SortOrder;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -94,8 +98,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
 /**
  * Audit service layer implementation
  */
@@ -104,6 +106,7 @@ import static 
org.elasticsearch.index.query.QueryBuilders.termQuery;
 public class AuditServiceImpl implements AuditService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditServiceImpl.class);
+    private static final Gson GSON = new GsonBuilder().create();
     private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
     private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
     private static final String DAY_FORMAT = "yyyy-MM-dd";
@@ -111,6 +114,24 @@ public class AuditServiceImpl implements AuditService {
     private static final DateTimeFormatter HOUR_DATE_FORMATTER = 
DateTimeFormat.forPattern(HOUR_FORMAT);
     private static final DateTimeFormatter DAY_DATE_FORMATTER = 
DateTimeFormat.forPattern(DAY_FORMAT);
 
+    private static final double DEFAULT_BOOST = 1.0;
+    private static final boolean ADJUST_PURE_NEGATIVE = true;
+    private static final int QUERY_FROM = 0;
+    private static final int QUERY_SIZE = 0;
+    private static final String SORT_ORDER = "ASC";
+    private static final String TERM_FILED = "log_ts";
+    private static final String AGGREGATIONS_COUNT = "count";
+    private static final String AGGREGATIONS_DELAY = "delay";
+    private static final String AGGREGATIONS = "aggregations";
+    private static final String BUCKETS = "buckets";
+    private static final String KEY = "key";
+    private static final String VALUE = "value";
+    private static final String INLONG_GROUP_ID = "inlong_group_id";
+    private static final String INLONG_STREAM_ID = "inlong_stream_id";
+    private static final String COUNT = "count";
+    private static final String DELAY = "delay";
+    private static final String TERMS = "terms";
+
     // key: type of audit base item, value: entity of audit base item
     private final Map<String, AuditBaseEntity> auditSentItemMap = new 
ConcurrentHashMap<>();
 
@@ -323,18 +344,23 @@ public class AuditServiceImpl implements AuditService {
                     LOGGER.warn("elasticsearch index={} not exists", index);
                     continue;
                 }
-                SearchResponse response = 
elasticsearchApi.search(toAuditSearchRequest(index, groupId, streamId));
-                final List<Aggregation> aggregations = 
response.getAggregations().asList();
-                if (CollectionUtils.isNotEmpty(aggregations)) {
-                    ParsedTerms terms = (ParsedTerms) aggregations.get(0);
-                    if (CollectionUtils.isNotEmpty(terms.getBuckets())) {
-                        List<AuditInfo> auditSet = 
terms.getBuckets().stream().map(bucket -> {
+                JsonObject response = elasticsearchApi.search(index, 
toAuditSearchRequestJson(groupId, streamId));
+                JsonObject aggregations = 
response.getAsJsonObject(AGGREGATIONS).getAsJsonObject(TERM_FILED);
+                if (!aggregations.isJsonNull()) {
+                    JsonObject logTs = 
aggregations.getAsJsonObject(TERM_FILED);
+                    if (!logTs.isJsonNull()) {
+                        JsonArray buckets = logTs.getAsJsonArray(BUCKETS);
+                        List<AuditInfo> auditSet = new ArrayList<>();
+                        for (int i = 0; i < buckets.size(); i++) {
+                            JsonObject bucket = 
buckets.get(i).getAsJsonObject();
                             AuditInfo vo = new AuditInfo();
-                            vo.setLogTs(bucket.getKeyAsString());
-                            vo.setCount((long) ((ParsedSum) 
bucket.getAggregations().asList().get(0)).getValue());
-                            vo.setDelay((long) ((ParsedSum) 
bucket.getAggregations().asList().get(1)).getValue());
-                            return vo;
-                        }).collect(Collectors.toList());
+                            vo.setLogTs(bucket.get(KEY).getAsString());
+                            vo.setCount((long) 
bucket.get(AGGREGATIONS_COUNT).getAsJsonObject().get(VALUE)
+                                    .getAsLong());
+                            vo.setDelay((long) 
bucket.get(AGGREGATIONS_DELAY).getAsJsonObject().get(VALUE)
+                                    .getAsLong());
+                            auditSet.add(vo);
+                        }
                         result.add(new AuditVO(auditId, auditName, auditSet, 
auditIdMap.getOrDefault(auditId, null)));
                     }
                 }
@@ -458,27 +484,61 @@ public class AuditServiceImpl implements AuditService {
     }
 
     /**
-     * Convert to elasticsearch search request
+     * Convert to elasticsearch search request json
      *
-     * @param index The index of elasticsearch
      * @param groupId The groupId of inlong
      * @param streamId The streamId of inlong
-     * @return The search request of elasticsearch
+     * @return The search request of elasticsearch json
      */
-    private SearchRequest toAuditSearchRequest(String index, String groupId, 
String streamId) {
-        TermsAggregationBuilder builder = 
AggregationBuilders.terms("log_ts").field("log_ts")
-                
.size(Integer.MAX_VALUE).subAggregation(AggregationBuilders.sum("count").field("count"))
-                
.subAggregation(AggregationBuilders.sum("delay").field("delay"));
-        BoolQueryBuilder filterBuilder = new BoolQueryBuilder();
-        filterBuilder.must(termQuery("inlong_group_id", groupId));
-        filterBuilder.must(termQuery("inlong_stream_id", streamId));
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
-        sourceBuilder.aggregation(builder);
-        sourceBuilder.query(filterBuilder);
-        sourceBuilder.from(0);
-        sourceBuilder.size(0);
-        sourceBuilder.sort("log_ts", SortOrder.ASC);
-        return new SearchRequest(new String[]{index}, sourceBuilder);
+    public static JsonObject toAuditSearchRequestJson(String groupId, String 
streamId) {
+        Map<String, ElasticsearchQueryInfo.TermValue> groupIdMap = 
Maps.newHashMap();
+        groupIdMap.put(INLONG_GROUP_ID, new 
ElasticsearchQueryInfo.TermValue(groupId, DEFAULT_BOOST));
+        ElasticsearchQueryInfo.QueryTerm groupIdTerm = 
ElasticsearchQueryInfo.QueryTerm.builder().term(groupIdMap)
+                .build();
+        Map<String, ElasticsearchQueryInfo.TermValue> streamIdMap = 
Maps.newHashMap();
+        streamIdMap.put(INLONG_STREAM_ID, new 
ElasticsearchQueryInfo.TermValue(streamId, DEFAULT_BOOST));
+        ElasticsearchQueryInfo.QueryTerm streamIdTerm = 
ElasticsearchQueryInfo.QueryTerm.builder().term(streamIdMap)
+                .build();
+        QueryBool boolInfo = QueryBool.builder()
+                .must(Lists.newArrayList(groupIdTerm, streamIdTerm))
+                .boost(DEFAULT_BOOST)
+                .adjustPureNegative(ADJUST_PURE_NEGATIVE)
+                .build();
+        ElasticsearchQueryInfo queryInfo = 
ElasticsearchQueryInfo.builder().bool(boolInfo).build();
+
+        Map<String, SortValue> termValueInfoMap = Maps.newHashMap();
+        termValueInfoMap.put(TERM_FILED, new SortValue(SORT_ORDER));
+        List<Map<String, SortValue>> list = 
Lists.newArrayList(termValueInfoMap);
+        ElasticsearchQuerySortInfo sortInfo = 
ElasticsearchQuerySortInfo.builder().sort(list).build();
+
+        Sum countSum = Sum.builder()
+                .sum(new Field(COUNT))
+                .build();
+        Sum delaySum = Sum.builder()
+                .sum(new Field(DELAY))
+                .build();
+        Map<String, Sum> aggregations = Maps.newHashMap();
+        aggregations.put(COUNT, countSum);
+        aggregations.put(DELAY, delaySum);
+        ElasticsearchAggregationsTermsInfo termsInfo = 
ElasticsearchAggregationsTermsInfo.builder()
+                .field(TERM_FILED)
+                .size(Integer.MAX_VALUE)
+                .aggregations(aggregations)
+                .build();
+        Map<String, ElasticsearchAggregationsTermsInfo> terms = 
Maps.newHashMap();
+        terms.put(TERMS, termsInfo);
+        Map<String, Map<String, ElasticsearchAggregationsTermsInfo>> logTs = 
Maps.newHashMap();
+        logTs.put(TERM_FILED, terms);
+
+        ElasticsearchRequest request = ElasticsearchRequest.builder()
+                .from(QUERY_FROM)
+                .size(QUERY_SIZE)
+                .query(queryInfo)
+                .sort(sortInfo)
+                .aggregations(logTs)
+                .build();
+
+        return GSON.toJsonTree(request).getAsJsonObject();
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index bd2779bbc2..5c537519be 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -37,8 +37,8 @@ import 
org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.common.util.set.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 7c25f0502a..52616f604e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -34,7 +34,6 @@ import 
org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.client.RequestOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -108,7 +107,7 @@ public class ElasticsearchDataNodeOperator extends 
AbstractDataNodeOperator {
         client.setEsConfig(config);
         boolean result;
         try {
-            result = client.getEsClient().ping(RequestOptions.DEFAULT);
+            result = client.ping();
             LOGGER.info("elasticsearch connection is {} for url={}, 
username={}, password={}", result, url, username,
                     password);
             return result;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 266655d4dc..e88bb102b3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -17,28 +17,32 @@
 
 package org.apache.inlong.manager.service.resource.sink.es;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchCreateIndexResponse;
 import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo;
+import 
org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo.IndexField;
+import 
org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo.IndexMappings;
 
-import org.apache.commons.collections.CollectionUtils;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetMappingsRequest;
-import org.elasticsearch.client.indices.PutMappingRequest;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.xcontent.XContentType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
 import org.springframework.stereotype.Component;
+import org.springframework.web.client.HttpClientErrorException.NotFound;
+import sun.misc.BASE64Encoder;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -46,194 +50,233 @@ import java.util.Map;
 /**
  * elasticsearch template service
  */
+@Slf4j
 @Component
 public class ElasticsearchApi {
 
+    private static final Gson GSON = new GsonBuilder().create();
+
+    private static final String MAPPINGS_KEY = "mappings";
+
     private static final String FIELD_KEY = "properties";
 
+    private static final String CONTENT_TYPE_KEY = "Content-Type";
+
+    private static final String FIELD_TYPE = "type";
+
+    private static final String FIELD_FORMAT = "format";
+
+    private static final String CONTENT_TYPE_VALUE = 
"application/json;charset=UTF-8";
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchApi.class);
 
     @Autowired
     private ElasticsearchConfig esConfig;
 
     /**
-     * Search
+     * Get http headers by token.
      *
-     * @param searchRequest The search request of Elasticsearch
-     * @return Search reponse of Elasticsearch
-     * @throws IOException The io exception may throws
+     * @return http header infos
      */
-    public SearchResponse search(SearchRequest searchRequest) throws 
IOException {
-        return search(searchRequest, RequestOptions.DEFAULT);
+    private HttpHeaders getHttpHeaders() {
+        HttpHeaders headers = new HttpHeaders();
+        headers.add(CONTENT_TYPE_KEY, CONTENT_TYPE_VALUE);
+        if (esConfig.getAuthEnable()) {
+            if (StringUtils.isNotEmpty(esConfig.getUsername()) && 
StringUtils.isNotEmpty(esConfig.getPassword())) {
+                String tokenStr = esConfig.getUsername() + ":" + 
esConfig.getPassword();
+                String token = String.valueOf(new 
BASE64Encoder().encode(tokenStr.getBytes(StandardCharsets.UTF_8)));
+                headers.add("Authorization", "Basic " + token);
+            }
+        }
+        return headers;
     }
 
     /**
-     * Search
+     * Search.
      *
-     * @param searchRequest The search request of Elasticsearch
-     * @param options The options of Elasticsearch
-     * @return Search reponse of Elasticsearch
-     * @throws IOException The io exception may throws
+     * @param indexName The index name
+     * @param request The request json
+     * @return the elasticsearch seqrch result
+     * @throws Exception any exception if occurred
      */
-    public SearchResponse search(SearchRequest searchRequest, RequestOptions 
options) throws IOException {
-        LOG.info("get es search request of {}", 
searchRequest.source().toString());
-        return getEsClient().search(searchRequest, options);
+    public JsonObject search(String indexName, JsonObject request) throws 
Exception {
+        LOG.info("get es search es index:{} request:{}", indexName, 
request.toString());
+        final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH + 
indexName + "/_search";
+        return HttpUtils.request(esConfig.getRestClient(), url, 
HttpMethod.POST, request.toString(), getHttpHeaders(),
+                JsonObject.class);
     }
 
     /**
-     * Check index exists
+     * Check index exists.
      *
-     * @param indexName The index name of Elasticsearch
-     * @return true if exists else false
-     * @throws IOException The exception may throws
+     * @param indexName The elasticsearch index name
+     * @return true or false
+     * @throws Exception any exception if occurred
      */
-    public boolean indexExists(String indexName) throws IOException {
-        GetIndexRequest getIndexRequest = new GetIndexRequest();
-        getIndexRequest.indices(indexName);
-        return getEsClient().indices().exists(getIndexRequest, 
RequestOptions.DEFAULT);
+    public boolean indexExists(String indexName) throws Exception {
+        final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH + 
indexName;
+        try {
+            return HttpUtils.headRequest(esConfig.getRestClient(), url, null, 
getHttpHeaders());
+        } catch (NotFound e) {
+            return false;
+        }
     }
 
     /**
-     * Create index
+     * Check if the cluster is available.
      *
-     * @param indexName The index name of Elasticsearch
-     * @throws IOException The exception may throws
+     * @return true or false
      */
-    public void createIndex(String indexName) throws IOException {
-        CreateIndexRequest createIndexRequest = new 
CreateIndexRequest(indexName);
+    public boolean ping() throws Exception {
+        final String[] urls = esConfig.getHttpUrls(InlongConstants.SLASH);
+        boolean result = true;
+        for (String url : urls) {
+            result &= HttpUtils.headRequest(esConfig.getRestClient(), url, 
null, getHttpHeaders());
+        }
+        return result;
+    }
 
-        CreateIndexResponse createIndexResponse = getEsClient().indices()
-                .create(createIndexRequest, RequestOptions.DEFAULT);
-        LOG.info("create es index:{} result: {}", indexName, 
createIndexResponse.isAcknowledged());
+    /**
+     * Create index by REST API.
+     *
+     * @param indexName elasticsearch index name
+     * @throws IOException any IOException if occurred
+     */
+    public void createIndex(String indexName) throws Exception {
+        final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH + 
indexName;
+        ElasticsearchCreateIndexResponse response = 
HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+                null, getHttpHeaders(), 
ElasticsearchCreateIndexResponse.class);
+        LOG.info("create es index:{} result: {}", indexName, 
response.getAcknowledged());
     }
 
     /**
-     * Get mapping info
+     * Get mapping info.
      *
      * @param fieldsInfo The fields info of Elasticsearch
-     * @return String list of fields translation
+     * @return elasticsearch index mappings object {@link 
ElasticsearchIndexMappingInfo}
      * @throws IOException The exception may throws
      */
-    private List<String> getMappingInfo(List<ElasticsearchFieldInfo> 
fieldsInfo) {
-        List<String> fieldList = new ArrayList<>();
+    private ElasticsearchIndexMappingInfo 
getMappingInfo(List<ElasticsearchFieldInfo> fieldsInfo) {
+        Map<String, IndexField> fields = Maps.newHashMap();
         for (ElasticsearchFieldInfo field : fieldsInfo) {
-            StringBuilder fieldStr = new StringBuilder().append("        
\"").append(field.getFieldName())
-                    .append("\" : {\n          \"type\" : \"")
-                    .append(field.getFieldType()).append("\"");
+            IndexField indexField = new IndexField();
+            fields.put(field.getFieldName(), indexField);
+            indexField.setType(field.getFieldType());
             if (field.getFieldType().equals("text")) {
                 if (StringUtils.isNotEmpty(field.getAnalyzer())) {
-                    fieldStr.append(",\n          \"analyzer\" : \"")
-                            .append(field.getAnalyzer()).append("\"");
+                    indexField.setAnalyzer(field.getAnalyzer());
                 }
                 if (StringUtils.isNotEmpty(field.getSearchAnalyzer())) {
-                    fieldStr.append(",\n          \"search_analyzer\" : \"")
-                            .append(field.getSearchAnalyzer()).append("\"");
+                    indexField.setSearchAnalyzer(field.getSearchAnalyzer());
                 }
             } else if (field.getFieldType().equals("date")) {
                 if (StringUtils.isNotEmpty(field.getFieldFormat())) {
-                    fieldStr.append(",\n          \"format\" : \"")
-                            .append(field.getFieldFormat()).append("\"");
+                    indexField.setFormat(field.getFieldFormat());
                 }
             } else if (field.getFieldType().equals("scaled_float")) {
                 if (StringUtils.isNotEmpty(field.getScalingFactor())) {
-                    fieldStr.append(",\n          \"scaling_factor\" : \"")
-                            .append(field.getScalingFactor()).append("\"");
+                    indexField.setScalingFactor(field.getScalingFactor());
                 }
             }
-            fieldStr.append("\n        }");
-            fieldList.add(fieldStr.toString());
         }
-        return fieldList;
+        return 
ElasticsearchIndexMappingInfo.builder().mappings(IndexMappings.builder()
+                .properties(fields).build()).build();
     }
 
     /**
-     * Create index and mapping
+     * Create index and mapping by REST API.
      *
      * @param indexName Index name of creating
      * @param fieldInfos Field infos
-     * @throws IOException The exception may throws
+     * @throws Exception The exception may throws
      */
-    public void createIndexAndMapping(String indexName,
-            List<ElasticsearchFieldInfo> fieldInfos) throws IOException {
-        CreateIndexRequest createIndexRequest = new 
CreateIndexRequest(indexName);
-        List<String> fieldList = getMappingInfo(fieldInfos);
-        StringBuilder mapping = new StringBuilder().append("{\n      
\"properties\" : {\n")
-                .append(StringUtils.join(fieldList, ",\n")).append("\n      
}\n}");
-        createIndexRequest.mapping(mapping.toString(), XContentType.JSON);
-
-        CreateIndexResponse createIndexResponse = getEsClient().indices()
-                .create(createIndexRequest, RequestOptions.DEFAULT);
-        LOG.info("create {}:{}", indexName, 
createIndexResponse.isAcknowledged());
+    public void createIndexAndMapping(String indexName, 
List<ElasticsearchFieldInfo> fieldInfos) throws Exception {
+        ElasticsearchIndexMappingInfo mappingInfo = getMappingInfo(fieldInfos);
+        final String url = esConfig.getOneHttpUrl() + "/" + indexName;
+        ElasticsearchCreateIndexResponse response = 
HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+                GSON.toJsonTree(mappingInfo).getAsJsonObject().toString(), 
getHttpHeaders(),
+                ElasticsearchCreateIndexResponse.class);
+        LOG.info("create {}:{}", indexName, response.getIndex());
     }
 
     /**
-     * Get fields
+     * Get mapping map.
      *
-     * @param indexName The index name of Elasticsearch
-     * @return a {@link Map} collection that contains {@link String}
-     *     as key and {@link MappingMetaData} as value.
-     * @throws IOException The exception may throws
+     * @param indexName elasticsearch index name
+     * @return map of elasticsearch index mapping info
      */
-    public Map<String, MappingMetaData> getFields(String indexName) throws 
IOException {
-        GetMappingsRequest request = new 
GetMappingsRequest().indices(indexName);
-        return getEsClient().indices().getMapping(request, 
RequestOptions.DEFAULT).mappings();
+    public Map<String, ElasticsearchFieldInfo> getMappingMap(String indexName) 
throws Exception {
+        final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH + 
indexName + "/_mapping";
+        JsonObject result = HttpUtils.request(esConfig.getRestClient(), url, 
HttpMethod.GET, null, getHttpHeaders(),
+                JsonObject.class);
+        JsonObject mappings = result.getAsJsonObject(indexName);
+        JsonObject properties = null;
+        JsonObject fields = null;
+        Map<String, ElasticsearchFieldInfo> fieldInfos = Maps.newHashMap();
+        if (ObjectUtils.isNotEmpty(mappings)) {
+            properties = mappings.getAsJsonObject(MAPPINGS_KEY);
+        }
+        if (ObjectUtils.isNotEmpty(properties)) {
+            fields = properties.getAsJsonObject(FIELD_KEY);
+        }
+        if (ObjectUtils.isNotEmpty(fields)) {
+            for (String key : fields.keySet()) {
+                JsonObject field = fields.getAsJsonObject(key);
+                if (StringUtils.isNotEmpty(key) && 
ObjectUtils.isNotEmpty(field)) {
+                    ElasticsearchFieldInfo fieldInfo = new 
ElasticsearchFieldInfo();
+                    if (ObjectUtils.isNotEmpty(field.get(FIELD_TYPE))) {
+                        
fieldInfo.setFieldType(field.get(FIELD_TYPE).getAsString());
+                    }
+                    if (ObjectUtils.isNotEmpty(field.get(FIELD_FORMAT))) {
+                        
fieldInfo.setFieldFormat(field.get(FIELD_FORMAT).getAsString());
+                    }
+                    fieldInfo.setFieldName(key);
+                    fieldInfos.put(key, fieldInfo);
+                }
+            }
+        }
+        return fieldInfos;
     }
 
     /**
-     * Add fieldss
+     * Add fields by REST API.
      *
-     * @param indexName The index name of Elasticsearch
-     * @param fieldInfos The fields info of Elasticsearch
-     * @throws IOException The exception may throws
+     * @param indexName elasticsearch index name
+     * @param fieldInfos elasticsearch field infos
+     * @throws Exception any exception if occurred
      */
-    public void addFields(String indexName, List<ElasticsearchFieldInfo> 
fieldInfos) throws IOException {
-        if (CollectionUtils.isNotEmpty(fieldInfos)) {
-            List<String> fieldList = getMappingInfo(fieldInfos);
-            StringBuilder mapping = new StringBuilder().append("{\n      
\"properties\" : {\n")
-                    .append(StringUtils.join(fieldList, ",\n")).append("\n     
 }\n}");
-            System.out.println(mapping.toString());
-            PutMappingRequest indexRequest = new PutMappingRequest(indexName)
-                    .source(mapping.toString(), XContentType.JSON);
-            AcknowledgedResponse acknowledgedResponse = getEsClient().indices()
-                    .putMapping(indexRequest, RequestOptions.DEFAULT);
-            LOG.info("put mapping: {} result: {}", mapping.toString(), 
acknowledgedResponse.toString());
+    public void addFields(String indexName, List<ElasticsearchFieldInfo> 
fieldInfos) throws Exception {
+        ElasticsearchIndexMappingInfo mappingInfo = getMappingInfo(fieldInfos);
+        if (ObjectUtils.isNotEmpty(mappingInfo) && 
!mappingInfo.getMappings().getProperties().isEmpty()) {
+            String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH + 
indexName + "/_mapping";
+            HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+                    
GSON.toJsonTree(mappingInfo.getMappings()).getAsJsonObject().toString(), 
getHttpHeaders(),
+                    Object.class);
         }
     }
 
     /**
-     * Add not exist fields
+     * Add not exist fields by REST API.
      *
-     * @param indexName The index name of elasticsearch
-     * @param fieldInfos The fields info of elasticsearch
-     * @throws IOException The exception may throws
+     * @param indexName elasticsearch index name
+     * @param fieldInfos elasticsearch field infos
+     * @throws Exception any exception if occurred
      */
-    public void addNotExistFields(String indexName,
-            List<ElasticsearchFieldInfo> fieldInfos) throws IOException {
-        List<ElasticsearchFieldInfo> notExistFieldInfos = new 
ArrayList<>(fieldInfos);
-        Map<String, MappingMetaData> mapping = getFields(indexName);
-        Map<String, Object> filedMap = (Map<String, Object>) 
mapping.get(indexName).getSourceAsMap().get(FIELD_KEY);
-        for (String key : filedMap.keySet()) {
-            for (ElasticsearchFieldInfo field : notExistFieldInfos) {
-                if (field.getFieldName().equals(key)) {
-                    notExistFieldInfos.remove(field);
-                    break;
-                }
+    public void addNotExistFields(String indexName, 
List<ElasticsearchFieldInfo> fieldInfos) throws Exception {
+        List<ElasticsearchFieldInfo> notExistFieldInfos = new ArrayList<>();
+        Map<String, ElasticsearchFieldInfo> mappings = 
getMappingMap(indexName);
+        for (ElasticsearchFieldInfo fieldInfo : fieldInfos) {
+            if (!mappings.containsKey(fieldInfo.getFieldName())) {
+                notExistFieldInfos.add(fieldInfo);
             }
         }
-        addFields(indexName, notExistFieldInfos);
-    }
-
-    /**
-     * Get Elasticsearch client
-     *
-     * @return RestHighLevelClient
-     */
-    public RestHighLevelClient getEsClient() {
-        return esConfig.highLevelClient();
+        if (!notExistFieldInfos.isEmpty()) {
+            addFields(indexName, notExistFieldInfos);
+        }
     }
 
     /**
-     * Get Elasticsearch client
+     * Get Elasticsearch configuration.
      *
      * @param config Elasticsearch's configuration
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 35f1272bd4..6f4580897a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -22,20 +22,16 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 /**
  * Elasticsearch config information, including host, port, etc.
@@ -45,7 +41,13 @@ import java.util.List;
 public class ElasticsearchConfig {
 
     private static final Logger logger = 
LoggerFactory.getLogger(ElasticsearchConfig.class);
-    private static RestHighLevelClient highLevelClient;
+
+    private final Random rand = new Random();
+
+    @Autowired
+    private RestTemplate restTemplate;
+
+    private List<HttpHost> httpHosts;
     @Value("${es.index.search.hostname}")
     private String hosts;
     @Value("${es.auth.enable}")
@@ -56,54 +58,82 @@ public class ElasticsearchConfig {
     private String password;
 
     /**
-     * highLevelClient
+     *  Get http rest client.
      *
-     * @return RestHighLevelClient
+     * @return springframework RestTemplate
      */
-    public RestHighLevelClient highLevelClient() {
-        if (highLevelClient != null) {
-            return highLevelClient;
+    public RestTemplate getRestClient() {
+        if (restTemplate != null) {
+            return restTemplate;
         }
         try {
-            synchronized (RestHighLevelClient.class) {
-                if (highLevelClient == null) {
-                    List<HttpHost> hosts = new ArrayList<>();
-                    String[] hostArrays = 
this.hosts.split(InlongConstants.SEMICOLON);
-                    for (String host : hostArrays) {
-                        if (StringUtils.isNotBlank(host)) {
-                            host = host.trim();
-                            hosts.add(HttpHost.create(host));
-                        }
-                    }
-                    RestClientBuilder clientBuilder = 
RestClient.builder(hosts.toArray(new HttpHost[0]));
-                    this.setEsAuth(clientBuilder);
-                    highLevelClient = new RestHighLevelClient(clientBuilder);
+            synchronized (RestTemplate.class) {
+                if (restTemplate == null) {
+                    restTemplate = new RestTemplate();
                 }
             }
         } catch (Exception e) {
             logger.error("get es high level client error", e);
         }
-        return highLevelClient;
+        return restTemplate;
     }
 
     /**
-     * Elasticsearch authentication
+     * Get http hosts.
      *
-     * @param builder The builder
+     * @return list of http host info
      */
-    private void setEsAuth(RestClientBuilder builder) {
+    public List<HttpHost> getHttpHosts() {
+        if (httpHosts != null) {
+            return httpHosts;
+        }
         try {
-            logger.info("set es auth of enable={}", authEnable);
-            if (authEnable) {
-                final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-                credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username, password));
-                builder.setHttpClientConfigCallback(
-                        httpClientBuilder -> 
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
-
+            synchronized (HttpHost.class) {
+                httpHosts = new ArrayList<>();
+                String[] hostArrays = 
this.hosts.split(InlongConstants.SEMICOLON);
+                for (String host : hostArrays) {
+                    if (StringUtils.isNotBlank(host)) {
+                        host = host.trim();
+                        httpHosts.add(HttpHost.create(host));
+                    }
+                }
             }
         } catch (Exception e) {
-            logger.error("set es auth error ", e);
+            logger.error("get es http hosts error", e);
+        }
+        return httpHosts;
+    }
+
+    /**
+     * Get one http url.
+     *
+     * @return a http url
+     * @throws Exception any exception if occurred
+     */
+    public String getOneHttpUrl() throws Exception {
+        getHttpHosts();
+        if (!httpHosts.isEmpty() && httpHosts.size() > 0) {
+            return httpHosts.get(rand.nextInt(httpHosts.size())).toString();
         }
+        throw new Exception("http hosts is empty! please check hosts!");
     }
 
+    /**
+     * Get all http url.
+     *
+     * @param urlSuffix
+     * @return http url array
+     * @throws Exception
+     */
+    public String[] getHttpUrls(String urlSuffix) throws Exception {
+        getHttpHosts();
+        if (!httpHosts.isEmpty() && httpHosts.size() > 0) {
+            String[] urls = new String[httpHosts.size()];
+            for (int i = 0; i < urls.length; i++) {
+                urls[i] = httpHosts.get(i) + urlSuffix;
+            }
+            return urls;
+        }
+        throw new Exception("http hosts is empty! please check hosts!");
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
new file mode 100644
index 0000000000..aabf3f008d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link 
org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi}.
+ */
+public class ElasticsearchApiTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchApiTest.class);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    private static final String INTER_CONTAINER_ELASTICSEARCH_ALIAS = 
"elasticsearch";
+
+    private static final String ELASTICSEARCH_DOCKER_IMAGE_NAME = 
"elasticsearch:7.9.3";
+
+    private ElasticsearchApi elasticsearchApi;
+
+    private ElasticsearchConfig elasticsearchConfig;
+
+    private static final Gson GSON = new GsonBuilder().create();
+
+    private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new 
ElasticsearchContainer(
+            DockerImageName.parse(ELASTICSEARCH_DOCKER_IMAGE_NAME)
+                    
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
+                            .withNetwork(NETWORK)
+                            .withAccessToHost(true)
+                            .withEnv("discovery.type", "single-node")
+                            .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
+                            .withEnv("ELASTIC_PASSWORD", "test")
+                            
.withNetworkAliases(INTER_CONTAINER_ELASTICSEARCH_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    @BeforeAll
+    public static void beforeAll() {
+        ELASTICSEARCH_CONTAINER.setPortBindings(Arrays.asList("9200:9200", 
"9300:9300"));
+        ELASTICSEARCH_CONTAINER.withStartupTimeout(Duration.ofSeconds(300));
+        
ELASTICSEARCH_CONTAINER.setDockerImageName(ELASTICSEARCH_DOCKER_IMAGE_NAME);
+        Startables.deepStart(Stream.of(ELASTICSEARCH_CONTAINER)).join();
+        LOG.info("Containers are started.");
+    }
+
+    @BeforeEach
+    public void before() {
+        elasticsearchConfig = new ElasticsearchConfig();
+        elasticsearchConfig.setHosts("http://127.0.0.1:9200";);
+        elasticsearchConfig.setAuthEnable(true);
+        elasticsearchConfig.setUsername("admin");
+        elasticsearchConfig.setPassword("inlong");
+        elasticsearchApi = new ElasticsearchApi();
+        elasticsearchApi.setEsConfig(elasticsearchConfig);
+    }
+
+    @AfterAll
+    public static void teardown() {
+        if (ELASTICSEARCH_CONTAINER != null) {
+            ELASTICSEARCH_CONTAINER.stop();
+        }
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#search(String, JsonObject)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSearch() throws Exception {
+        final String indexName = "test_search";
+        final String searchJson = "{\n  \"query\": {\n    \"match_all\": {}\n  
}\n}";
+        final JsonObject search = GSON.fromJson(searchJson, JsonObject.class);
+        elasticsearchApi.createIndex(indexName);
+        JsonObject result = elasticsearchApi.search(indexName, search);
+        assertNotNull(result);
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#indexExists(String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testIndexExists() throws Exception {
+        final String indexName = "test_index_exists";
+        boolean result = elasticsearchApi.indexExists(indexName);
+        assertEquals(false, result);
+        elasticsearchApi.createIndex(indexName);
+        result = elasticsearchApi.indexExists(indexName);
+        assertEquals(true, result);
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#ping()}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testPing() throws Exception {
+        boolean result = elasticsearchApi.ping();
+        assertEquals(true, result);
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#createIndex(String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateIndex() throws Exception {
+        final String indexName = "test_create_index";
+        elasticsearchApi.createIndex(indexName);
+        assertTrue(elasticsearchApi.indexExists(indexName));
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#createIndexAndMapping(String, 
List)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateIndexAndMapping() throws Exception {
+        final String indexName = "test_create_index_and_mapping";
+        final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+        final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+        log_ts.setFieldType("keyword");
+        log_ts.setFieldName("log_ts");
+        fieldInfos.add(log_ts);
+        elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+        assertTrue(elasticsearchApi.indexExists(indexName));
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#getMappingMap(String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetMappingInfo() throws Exception {
+        final String indexName = "test_get_mapping_info";
+        final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+        final ElasticsearchFieldInfo count = new ElasticsearchFieldInfo();
+        count.setFieldType("double");
+        count.setFieldName("count");
+        fieldInfos.add(count);
+        final ElasticsearchFieldInfo date = new ElasticsearchFieldInfo();
+        date.setFieldType("date");
+        date.setFieldName("date");
+        date.setFieldFormat("yyyy-MM-dd HH:mm:ss||yyy-MM-dd||epoch_millis");
+        fieldInfos.add(date);
+        final ElasticsearchFieldInfo delay = new ElasticsearchFieldInfo();
+        delay.setFieldType("double");
+        delay.setFieldName("delay");
+        fieldInfos.add(delay);
+        final ElasticsearchFieldInfo inlong_group_id = new 
ElasticsearchFieldInfo();
+        inlong_group_id.setFieldType("text");
+        inlong_group_id.setFieldName("inlong_group_id");
+        fieldInfos.add(inlong_group_id);
+        final ElasticsearchFieldInfo inlong_stream_id = new 
ElasticsearchFieldInfo();
+        inlong_stream_id.setFieldType("text");
+        inlong_stream_id.setFieldName("inlong_stream_id");
+        fieldInfos.add(inlong_stream_id);
+        final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+        log_ts.setFieldType("keyword");
+        log_ts.setFieldName("log_ts");
+        fieldInfos.add(log_ts);
+        elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+        Map<String, ElasticsearchFieldInfo> result = 
elasticsearchApi.getMappingMap(indexName);
+        assertEquals("double", result.get("count").getFieldType());
+        assertEquals("double", result.get("delay").getFieldType());
+        assertEquals("date", result.get("date").getFieldType());
+        assertEquals("yyyy-MM-dd HH:mm:ss||yyy-MM-dd||epoch_millis", 
result.get("date").getFieldFormat());
+        assertEquals("text", result.get("inlong_stream_id").getFieldType());
+        assertEquals("text", result.get("inlong_group_id").getFieldType());
+        assertEquals("keyword", result.get("log_ts").getFieldType());
+    }
+
+    /**
+     * Test cases for {@link ElasticsearchApi#addFields(String, List)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAddFields() throws Exception {
+        final String indexName = "test_add_fields";
+        final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+        final ElasticsearchFieldInfo count = new ElasticsearchFieldInfo();
+        count.setFieldType("double");
+        count.setFieldName("count");
+        fieldInfos.add(count);
+        elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+
+        final List<ElasticsearchFieldInfo> addFieldInfos = new ArrayList<>();
+        final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+        log_ts.setFieldType("keyword");
+        log_ts.setFieldName("log_ts");
+        addFieldInfos.add(log_ts);
+        elasticsearchApi.addFields(indexName, addFieldInfos);
+
+        Map<String, ElasticsearchFieldInfo> result = 
elasticsearchApi.getMappingMap(indexName);
+        assertEquals("double", result.get("count").getFieldType());
+        assertEquals("keyword", result.get("log_ts").getFieldType());
+    }
+}

Reply via email to