This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f84ad372a4 [INLONG-8676][Manager] Elasticsearch - Modify the calling
method from SDK to HTTP (#9057)
f84ad372a4 is described below
commit f84ad372a49aa3ce1eb8d1f3850201e8c57278bc
Author: haibo.duan <[email protected]>
AuthorDate: Fri Mar 15 15:01:18 2024 +0800
[INLONG-8676][Manager] Elasticsearch - Modify the calling method from SDK
to HTTP (#9057)
---
.../inlong/manager/common/util/HttpUtils.java | 18 ++
.../es/ElasticsearchAggregationsTermsInfo.java | 71 +++++
.../pojo/node/es/ElasticsearchQueryInfo.java | 91 +++++++
.../pojo/node/es/ElasticsearchQuerySortInfo.java | 50 ++++
.../manager/pojo/node/es/ElasticsearchRequest.java | 57 ++++
.../sink/es/ElasticsearchCreateIndexResponse.java | 32 +++
.../sink/es/ElasticsearchIndexMappingInfo.java | 84 ++++++
inlong-manager/manager-service/pom.xml | 40 +--
.../service/core/impl/AgentServiceImpl.java | 2 +-
.../service/core/impl/AuditServiceImpl.java | 140 +++++++---
.../service/group/AbstractGroupOperator.java | 2 +-
.../node/es/ElasticsearchDataNodeOperator.java | 3 +-
.../service/resource/sink/es/ElasticsearchApi.java | 289 ++++++++++++---------
.../resource/sink/es/ElasticsearchConfig.java | 106 +++++---
.../manager/service/sink/ElasticsearchApiTest.java | 248 ++++++++++++++++++
15 files changed, 996 insertions(+), 237 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 5f5d492da6..85a971e706 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -185,6 +185,24 @@ public class HttpUtils {
}
}
+ /**
+ * Send HEAD request to the specified URL.
+ */
+ public static boolean headRequest(RestTemplate restTemplate, String url,
Map<String, Object> params,
+ HttpHeaders header) {
+ ResponseEntity<String> exchange;
+ boolean result = false;
+ HttpEntity<String> request = new HttpEntity(params, header);
+ log.debug("send request to {}, param {}", url, params);
+ exchange = restTemplate.exchange(url, HttpMethod.HEAD, request,
String.class);
+ HttpStatus statusCode = exchange.getStatusCode();
+ if (statusCode.is2xxSuccessful()) {
+ result = statusCode.is2xxSuccessful();
+ }
+ log.debug("success request to {}, status code {}, body {}", url,
statusCode, exchange.getBody());
+ return result;
+ }
+
/**
* Send GET request to the specified URL.
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
new file mode 100644
index 0000000000..f5acc844d7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchAggregationsTermsInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchAggregationsTermsInfo {
+
+ /**
+ * Aggregations terms field.
+ */
+ private String field;
+
+ /**
+ * Aggregations size.
+ */
+ private int size;
+
+ /**
+ * Aggregations info.
+ */
+ private Map<String, Sum> aggregations;
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Sum {
+
+ /**
+ * Aggregations sum field.
+ */
+ private Field sum;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Field {
+
+ /**
+ * Aggregations field value.
+ */
+ private String field;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
new file mode 100644
index 0000000000..d97cae54b1
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQueryInfo.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchQueryInfo {
+
+ /**
+ * Bool query value.
+ */
+ private QueryBool bool;
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class QueryBool {
+
+ /**
+ * Must query value.
+ */
+ private List<QueryTerm> must;
+
+ /**
+ * Use the boost parameter to boost search results.
+ */
+ private double boost;
+
+ /**
+ * adjust pure negative.
+ */
+ @SerializedName("adjust_pure_negative")
+ private boolean adjustPureNegative;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class QueryTerm {
+
+ /**
+ * Term query value.
+ */
+ private Map<String, TermValue> term;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class TermValue {
+
+ /**
+ * Term query value.
+ */
+ private String value;
+
+ /**
+ * Use the boost parameter to boost search results.
+ */
+ private double boost;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
new file mode 100644
index 0000000000..b34f822ed4
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchQuerySortInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchQuerySortInfo {
+
+ /**
+ * Query sort list.
+ */
+ private List<Map<String, SortValue>> sort;
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class SortValue {
+
+ /**
+ * Sort value.
+ */
+ private String order;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
new file mode 100644
index 0000000000..42fc348757
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchRequest {
+
+ /**
+ * From index to start the search from. Defaults to 0.
+ */
+ private int from;
+
+ /**
+ * The number of search hits to return. Defaults to 10.
+ */
+ private int size;
+
+ /**
+ * Sets the search query for this request.
+ */
+ private ElasticsearchQueryInfo query;
+
+ /**
+ * Adds a sort against the given field name and the sort ordering.
+ */
+ private ElasticsearchQuerySortInfo sort;
+
+ /**
+ * Add an aggregation to perform as part of the search.
+ */
+ private Map<String, Map<String, ElasticsearchAggregationsTermsInfo>>
aggregations;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
new file mode 100644
index 0000000000..38cb17eba7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchCreateIndexResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.es;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchCreateIndexResponse {
+
+ private String acknowledged;
+ private String shardsAcknowledged;
+ private String index;
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
new file mode 100644
index 0000000000..f66986b956
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchIndexMappingInfo.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.es;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ElasticsearchIndexMappingInfo {
+
+ /**
+ * mapping.
+ */
+ private IndexMappings mappings;
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class IndexMappings {
+
+ /**
+ * properties.
+ */
+ private Map<String, IndexField> properties;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class IndexField {
+
+ /**
+ * Index field type.
+ */
+ private String type;
+
+ /**
+ * Index text type field analyzer.
+ */
+ private String analyzer;
+
+ /**
+ * Index text type field search analyzer.
+ */
+ @SerializedName("search_analyzer")
+ private String searchAnalyzer;
+
+ /**
+ * Index date type field search format.
+ */
+ private String format;
+
+ /**
+ * Index scaled_float type scaling factor.
+ */
+ @SerializedName("scaling_factor")
+ private String scalingFactor;
+ }
+}
diff --git a/inlong-manager/manager-service/pom.xml
b/inlong-manager/manager-service/pom.xml
index 2a503a4a90..1a0d975776 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -225,38 +225,6 @@
<artifactId>hive-standalone-metastore</artifactId>
<version>${hive3x.version}</version>
</dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.carrotsearch</groupId>
- <artifactId>hppc</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-client-sniffer</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
@@ -507,12 +475,20 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 3b1741deb3..953df33e96 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -65,13 +65,13 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.common.util.set.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index a2163ce9ac..281dafe170 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -43,6 +43,14 @@ import org.apache.inlong.manager.pojo.audit.AuditRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceRequest;
import org.apache.inlong.manager.pojo.audit.AuditSourceResponse;
import org.apache.inlong.manager.pojo.audit.AuditVO;
+import
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo;
+import
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Field;
+import
org.apache.inlong.manager.pojo.node.es.ElasticsearchAggregationsTermsInfo.Sum;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQueryInfo.QueryBool;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo;
+import
org.apache.inlong.manager.pojo.node.es.ElasticsearchQuerySortInfo.SortValue;
+import org.apache.inlong.manager.pojo.node.es.ElasticsearchRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.audit.InlongAuditSourceOperator;
@@ -51,19 +59,15 @@ import org.apache.inlong.manager.service.core.AuditService;
import org.apache.inlong.manager.service.resource.sink.ck.ClickHouseConfig;
import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
-import
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.SortOrder;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -94,8 +98,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
/**
* Audit service layer implementation
*/
@@ -104,6 +106,7 @@ import static
org.elasticsearch.index.query.QueryBuilders.termQuery;
public class AuditServiceImpl implements AuditService {
private static final Logger LOGGER =
LoggerFactory.getLogger(AuditServiceImpl.class);
+ private static final Gson GSON = new GsonBuilder().create();
private static final String SECOND_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String HOUR_FORMAT = "yyyy-MM-dd HH";
private static final String DAY_FORMAT = "yyyy-MM-dd";
@@ -111,6 +114,24 @@ public class AuditServiceImpl implements AuditService {
private static final DateTimeFormatter HOUR_DATE_FORMATTER =
DateTimeFormat.forPattern(HOUR_FORMAT);
private static final DateTimeFormatter DAY_DATE_FORMATTER =
DateTimeFormat.forPattern(DAY_FORMAT);
+ private static final double DEFAULT_BOOST = 1.0;
+ private static final boolean ADJUST_PURE_NEGATIVE = true;
+ private static final int QUERY_FROM = 0;
+ private static final int QUERY_SIZE = 0;
+ private static final String SORT_ORDER = "ASC";
+ private static final String TERM_FILED = "log_ts";
+ private static final String AGGREGATIONS_COUNT = "count";
+ private static final String AGGREGATIONS_DELAY = "delay";
+ private static final String AGGREGATIONS = "aggregations";
+ private static final String BUCKETS = "buckets";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+ private static final String INLONG_GROUP_ID = "inlong_group_id";
+ private static final String INLONG_STREAM_ID = "inlong_stream_id";
+ private static final String COUNT = "count";
+ private static final String DELAY = "delay";
+ private static final String TERMS = "terms";
+
// key: type of audit base item, value: entity of audit base item
private final Map<String, AuditBaseEntity> auditSentItemMap = new
ConcurrentHashMap<>();
@@ -323,18 +344,23 @@ public class AuditServiceImpl implements AuditService {
LOGGER.warn("elasticsearch index={} not exists", index);
continue;
}
- SearchResponse response =
elasticsearchApi.search(toAuditSearchRequest(index, groupId, streamId));
- final List<Aggregation> aggregations =
response.getAggregations().asList();
- if (CollectionUtils.isNotEmpty(aggregations)) {
- ParsedTerms terms = (ParsedTerms) aggregations.get(0);
- if (CollectionUtils.isNotEmpty(terms.getBuckets())) {
- List<AuditInfo> auditSet =
terms.getBuckets().stream().map(bucket -> {
+ JsonObject response = elasticsearchApi.search(index,
toAuditSearchRequestJson(groupId, streamId));
+ JsonObject aggregations =
response.getAsJsonObject(AGGREGATIONS).getAsJsonObject(TERM_FILED);
+ if (!aggregations.isJsonNull()) {
+ JsonObject logTs =
aggregations.getAsJsonObject(TERM_FILED);
+ if (!logTs.isJsonNull()) {
+ JsonArray buckets = logTs.getAsJsonArray(BUCKETS);
+ List<AuditInfo> auditSet = new ArrayList<>();
+ for (int i = 0; i < buckets.size(); i++) {
+ JsonObject bucket =
buckets.get(i).getAsJsonObject();
AuditInfo vo = new AuditInfo();
- vo.setLogTs(bucket.getKeyAsString());
- vo.setCount((long) ((ParsedSum)
bucket.getAggregations().asList().get(0)).getValue());
- vo.setDelay((long) ((ParsedSum)
bucket.getAggregations().asList().get(1)).getValue());
- return vo;
- }).collect(Collectors.toList());
+ vo.setLogTs(bucket.get(KEY).getAsString());
+ vo.setCount((long)
bucket.get(AGGREGATIONS_COUNT).getAsJsonObject().get(VALUE)
+ .getAsLong());
+ vo.setDelay((long)
bucket.get(AGGREGATIONS_DELAY).getAsJsonObject().get(VALUE)
+ .getAsLong());
+ auditSet.add(vo);
+ }
result.add(new AuditVO(auditId, auditName, auditSet,
auditIdMap.getOrDefault(auditId, null)));
}
}
@@ -458,27 +484,61 @@ public class AuditServiceImpl implements AuditService {
}
/**
- * Convert to elasticsearch search request
+ * Convert to elasticsearch search request json
*
- * @param index The index of elasticsearch
* @param groupId The groupId of inlong
* @param streamId The streamId of inlong
- * @return The search request of elasticsearch
+ * @return The search request of elasticsearch json
*/
- private SearchRequest toAuditSearchRequest(String index, String groupId,
String streamId) {
- TermsAggregationBuilder builder =
AggregationBuilders.terms("log_ts").field("log_ts")
-
.size(Integer.MAX_VALUE).subAggregation(AggregationBuilders.sum("count").field("count"))
-
.subAggregation(AggregationBuilders.sum("delay").field("delay"));
- BoolQueryBuilder filterBuilder = new BoolQueryBuilder();
- filterBuilder.must(termQuery("inlong_group_id", groupId));
- filterBuilder.must(termQuery("inlong_stream_id", streamId));
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.aggregation(builder);
- sourceBuilder.query(filterBuilder);
- sourceBuilder.from(0);
- sourceBuilder.size(0);
- sourceBuilder.sort("log_ts", SortOrder.ASC);
- return new SearchRequest(new String[]{index}, sourceBuilder);
+ public static JsonObject toAuditSearchRequestJson(String groupId, String
streamId) {
+ Map<String, ElasticsearchQueryInfo.TermValue> groupIdMap =
Maps.newHashMap();
+ groupIdMap.put(INLONG_GROUP_ID, new
ElasticsearchQueryInfo.TermValue(groupId, DEFAULT_BOOST));
+ ElasticsearchQueryInfo.QueryTerm groupIdTerm =
ElasticsearchQueryInfo.QueryTerm.builder().term(groupIdMap)
+ .build();
+ Map<String, ElasticsearchQueryInfo.TermValue> streamIdMap =
Maps.newHashMap();
+ streamIdMap.put(INLONG_STREAM_ID, new
ElasticsearchQueryInfo.TermValue(streamId, DEFAULT_BOOST));
+ ElasticsearchQueryInfo.QueryTerm streamIdTerm =
ElasticsearchQueryInfo.QueryTerm.builder().term(streamIdMap)
+ .build();
+ QueryBool boolInfo = QueryBool.builder()
+ .must(Lists.newArrayList(groupIdTerm, streamIdTerm))
+ .boost(DEFAULT_BOOST)
+ .adjustPureNegative(ADJUST_PURE_NEGATIVE)
+ .build();
+ ElasticsearchQueryInfo queryInfo =
ElasticsearchQueryInfo.builder().bool(boolInfo).build();
+
+ Map<String, SortValue> termValueInfoMap = Maps.newHashMap();
+ termValueInfoMap.put(TERM_FILED, new SortValue(SORT_ORDER));
+ List<Map<String, SortValue>> list =
Lists.newArrayList(termValueInfoMap);
+ ElasticsearchQuerySortInfo sortInfo =
ElasticsearchQuerySortInfo.builder().sort(list).build();
+
+ Sum countSum = Sum.builder()
+ .sum(new Field(COUNT))
+ .build();
+ Sum delaySum = Sum.builder()
+ .sum(new Field(DELAY))
+ .build();
+ Map<String, Sum> aggregations = Maps.newHashMap();
+ aggregations.put(COUNT, countSum);
+ aggregations.put(DELAY, delaySum);
+ ElasticsearchAggregationsTermsInfo termsInfo =
ElasticsearchAggregationsTermsInfo.builder()
+ .field(TERM_FILED)
+ .size(Integer.MAX_VALUE)
+ .aggregations(aggregations)
+ .build();
+ Map<String, ElasticsearchAggregationsTermsInfo> terms =
Maps.newHashMap();
+ terms.put(TERMS, termsInfo);
+ Map<String, Map<String, ElasticsearchAggregationsTermsInfo>> logTs =
Maps.newHashMap();
+ logTs.put(TERM_FILED, terms);
+
+ ElasticsearchRequest request = ElasticsearchRequest.builder()
+ .from(QUERY_FROM)
+ .size(QUERY_SIZE)
+ .query(queryInfo)
+ .sort(sortInfo)
+ .aggregations(logTs)
+ .build();
+
+ return GSON.toJsonTree(request).getAsJsonObject();
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index bd2779bbc2..5c537519be 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -37,8 +37,8 @@ import
org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.common.util.set.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 7c25f0502a..52616f604e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -34,7 +34,6 @@ import
org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.client.RequestOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -108,7 +107,7 @@ public class ElasticsearchDataNodeOperator extends
AbstractDataNodeOperator {
client.setEsConfig(config);
boolean result;
try {
- result = client.getEsClient().ping(RequestOptions.DEFAULT);
+ result = client.ping();
LOGGER.info("elasticsearch connection is {} for url={},
username={}, password={}", result, url, username,
password);
return result;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 266655d4dc..e88bb102b3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -17,28 +17,32 @@
package org.apache.inlong.manager.service.resource.sink.es;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchCreateIndexResponse;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo;
+import
org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo.IndexField;
+import
org.apache.inlong.manager.pojo.sink.es.ElasticsearchIndexMappingInfo.IndexMappings;
-import org.apache.commons.collections.CollectionUtils;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.client.indices.GetMappingsRequest;
-import org.elasticsearch.client.indices.PutMappingRequest;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.HttpClientErrorException.NotFound;
+import sun.misc.BASE64Encoder;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -46,194 +50,233 @@ import java.util.Map;
/**
* elasticsearch template service
*/
+@Slf4j
@Component
public class ElasticsearchApi {
+ private static final Gson GSON = new GsonBuilder().create();
+
+ private static final String MAPPINGS_KEY = "mappings";
+
private static final String FIELD_KEY = "properties";
+ private static final String CONTENT_TYPE_KEY = "Content-Type";
+
+ private static final String FIELD_TYPE = "type";
+
+ private static final String FIELD_FORMAT = "format";
+
+ private static final String CONTENT_TYPE_VALUE =
"application/json;charset=UTF-8";
+
private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchApi.class);
@Autowired
private ElasticsearchConfig esConfig;
/**
- * Search
+ * Get http headers by token.
*
- * @param searchRequest The search request of Elasticsearch
- * @return Search reponse of Elasticsearch
- * @throws IOException The io exception may throws
+ * @return http header infos
*/
- public SearchResponse search(SearchRequest searchRequest) throws
IOException {
- return search(searchRequest, RequestOptions.DEFAULT);
+ private HttpHeaders getHttpHeaders() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.add(CONTENT_TYPE_KEY, CONTENT_TYPE_VALUE);
+ if (esConfig.getAuthEnable()) {
+ if (StringUtils.isNotEmpty(esConfig.getUsername()) &&
StringUtils.isNotEmpty(esConfig.getPassword())) {
+ String tokenStr = esConfig.getUsername() + ":" +
esConfig.getPassword();
+ String token = String.valueOf(new
BASE64Encoder().encode(tokenStr.getBytes(StandardCharsets.UTF_8)));
+ headers.add("Authorization", "Basic " + token);
+ }
+ }
+ return headers;
}
/**
- * Search
+ * Search.
*
- * @param searchRequest The search request of Elasticsearch
- * @param options The options of Elasticsearch
- * @return Search reponse of Elasticsearch
- * @throws IOException The io exception may throws
+ * @param indexName The index name
+ * @param request The request json
+ * @return the elasticsearch seqrch result
+ * @throws Exception any exception if occurred
*/
- public SearchResponse search(SearchRequest searchRequest, RequestOptions
options) throws IOException {
- LOG.info("get es search request of {}",
searchRequest.source().toString());
- return getEsClient().search(searchRequest, options);
+ public JsonObject search(String indexName, JsonObject request) throws
Exception {
+ LOG.info("get es search es index:{} request:{}", indexName,
request.toString());
+ final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH +
indexName + "/_search";
+ return HttpUtils.request(esConfig.getRestClient(), url,
HttpMethod.POST, request.toString(), getHttpHeaders(),
+ JsonObject.class);
}
/**
- * Check index exists
+ * Check index exists.
*
- * @param indexName The index name of Elasticsearch
- * @return true if exists else false
- * @throws IOException The exception may throws
+ * @param indexName The elasticsearch index name
+ * @return true or false
+ * @throws Exception any exception if occurred
*/
- public boolean indexExists(String indexName) throws IOException {
- GetIndexRequest getIndexRequest = new GetIndexRequest();
- getIndexRequest.indices(indexName);
- return getEsClient().indices().exists(getIndexRequest,
RequestOptions.DEFAULT);
+ public boolean indexExists(String indexName) throws Exception {
+ final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH +
indexName;
+ try {
+ return HttpUtils.headRequest(esConfig.getRestClient(), url, null,
getHttpHeaders());
+ } catch (NotFound e) {
+ return false;
+ }
}
/**
- * Create index
+ * Check if the cluster is available.
*
- * @param indexName The index name of Elasticsearch
- * @throws IOException The exception may throws
+ * @return true or false
*/
- public void createIndex(String indexName) throws IOException {
- CreateIndexRequest createIndexRequest = new
CreateIndexRequest(indexName);
+ public boolean ping() throws Exception {
+ final String[] urls = esConfig.getHttpUrls(InlongConstants.SLASH);
+ boolean result = true;
+ for (String url : urls) {
+ result &= HttpUtils.headRequest(esConfig.getRestClient(), url,
null, getHttpHeaders());
+ }
+ return result;
+ }
- CreateIndexResponse createIndexResponse = getEsClient().indices()
- .create(createIndexRequest, RequestOptions.DEFAULT);
- LOG.info("create es index:{} result: {}", indexName,
createIndexResponse.isAcknowledged());
+ /**
+ * Create index by REST API.
+ *
+ * @param indexName elasticsearch index name
+ * @throws IOException any IOException if occurred
+ */
+ public void createIndex(String indexName) throws Exception {
+ final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH +
indexName;
+ ElasticsearchCreateIndexResponse response =
HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+ null, getHttpHeaders(),
ElasticsearchCreateIndexResponse.class);
+ LOG.info("create es index:{} result: {}", indexName,
response.getAcknowledged());
}
/**
- * Get mapping info
+ * Get mapping info.
*
* @param fieldsInfo The fields info of Elasticsearch
- * @return String list of fields translation
+ * @return elasticsearch index mappings object {@link
ElasticsearchIndexMappingInfo}
* @throws IOException The exception may throws
*/
- private List<String> getMappingInfo(List<ElasticsearchFieldInfo>
fieldsInfo) {
- List<String> fieldList = new ArrayList<>();
+ private ElasticsearchIndexMappingInfo
getMappingInfo(List<ElasticsearchFieldInfo> fieldsInfo) {
+ Map<String, IndexField> fields = Maps.newHashMap();
for (ElasticsearchFieldInfo field : fieldsInfo) {
- StringBuilder fieldStr = new StringBuilder().append("
\"").append(field.getFieldName())
- .append("\" : {\n \"type\" : \"")
- .append(field.getFieldType()).append("\"");
+ IndexField indexField = new IndexField();
+ fields.put(field.getFieldName(), indexField);
+ indexField.setType(field.getFieldType());
if (field.getFieldType().equals("text")) {
if (StringUtils.isNotEmpty(field.getAnalyzer())) {
- fieldStr.append(",\n \"analyzer\" : \"")
- .append(field.getAnalyzer()).append("\"");
+ indexField.setAnalyzer(field.getAnalyzer());
}
if (StringUtils.isNotEmpty(field.getSearchAnalyzer())) {
- fieldStr.append(",\n \"search_analyzer\" : \"")
- .append(field.getSearchAnalyzer()).append("\"");
+ indexField.setSearchAnalyzer(field.getSearchAnalyzer());
}
} else if (field.getFieldType().equals("date")) {
if (StringUtils.isNotEmpty(field.getFieldFormat())) {
- fieldStr.append(",\n \"format\" : \"")
- .append(field.getFieldFormat()).append("\"");
+ indexField.setFormat(field.getFieldFormat());
}
} else if (field.getFieldType().equals("scaled_float")) {
if (StringUtils.isNotEmpty(field.getScalingFactor())) {
- fieldStr.append(",\n \"scaling_factor\" : \"")
- .append(field.getScalingFactor()).append("\"");
+ indexField.setScalingFactor(field.getScalingFactor());
}
}
- fieldStr.append("\n }");
- fieldList.add(fieldStr.toString());
}
- return fieldList;
+ return
ElasticsearchIndexMappingInfo.builder().mappings(IndexMappings.builder()
+ .properties(fields).build()).build();
}
/**
- * Create index and mapping
+ * Create index and mapping by REST API.
*
* @param indexName Index name of creating
* @param fieldInfos Field infos
- * @throws IOException The exception may throws
+ * @throws Exception The exception may throws
*/
- public void createIndexAndMapping(String indexName,
- List<ElasticsearchFieldInfo> fieldInfos) throws IOException {
- CreateIndexRequest createIndexRequest = new
CreateIndexRequest(indexName);
- List<String> fieldList = getMappingInfo(fieldInfos);
- StringBuilder mapping = new StringBuilder().append("{\n
\"properties\" : {\n")
- .append(StringUtils.join(fieldList, ",\n")).append("\n
}\n}");
- createIndexRequest.mapping(mapping.toString(), XContentType.JSON);
-
- CreateIndexResponse createIndexResponse = getEsClient().indices()
- .create(createIndexRequest, RequestOptions.DEFAULT);
- LOG.info("create {}:{}", indexName,
createIndexResponse.isAcknowledged());
+ public void createIndexAndMapping(String indexName,
List<ElasticsearchFieldInfo> fieldInfos) throws Exception {
+ ElasticsearchIndexMappingInfo mappingInfo = getMappingInfo(fieldInfos);
+ final String url = esConfig.getOneHttpUrl() + "/" + indexName;
+ ElasticsearchCreateIndexResponse response =
HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+ GSON.toJsonTree(mappingInfo).getAsJsonObject().toString(),
getHttpHeaders(),
+ ElasticsearchCreateIndexResponse.class);
+ LOG.info("create {}:{}", indexName, response.getIndex());
}
/**
- * Get fields
+ * Get mapping map.
*
- * @param indexName The index name of Elasticsearch
- * @return a {@link Map} collection that contains {@link String}
- * as key and {@link MappingMetaData} as value.
- * @throws IOException The exception may throws
+ * @param indexName elasticsearch index name
+ * @return map of elasticsearch index mapping info
*/
- public Map<String, MappingMetaData> getFields(String indexName) throws
IOException {
- GetMappingsRequest request = new
GetMappingsRequest().indices(indexName);
- return getEsClient().indices().getMapping(request,
RequestOptions.DEFAULT).mappings();
+ public Map<String, ElasticsearchFieldInfo> getMappingMap(String indexName)
throws Exception {
+ final String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH +
indexName + "/_mapping";
+ JsonObject result = HttpUtils.request(esConfig.getRestClient(), url,
HttpMethod.GET, null, getHttpHeaders(),
+ JsonObject.class);
+ JsonObject mappings = result.getAsJsonObject(indexName);
+ JsonObject properties = null;
+ JsonObject fields = null;
+ Map<String, ElasticsearchFieldInfo> fieldInfos = Maps.newHashMap();
+ if (ObjectUtils.isNotEmpty(mappings)) {
+ properties = mappings.getAsJsonObject(MAPPINGS_KEY);
+ }
+ if (ObjectUtils.isNotEmpty(properties)) {
+ fields = properties.getAsJsonObject(FIELD_KEY);
+ }
+ if (ObjectUtils.isNotEmpty(fields)) {
+ for (String key : fields.keySet()) {
+ JsonObject field = fields.getAsJsonObject(key);
+ if (StringUtils.isNotEmpty(key) &&
ObjectUtils.isNotEmpty(field)) {
+ ElasticsearchFieldInfo fieldInfo = new
ElasticsearchFieldInfo();
+ if (ObjectUtils.isNotEmpty(field.get(FIELD_TYPE))) {
+
fieldInfo.setFieldType(field.get(FIELD_TYPE).getAsString());
+ }
+ if (ObjectUtils.isNotEmpty(field.get(FIELD_FORMAT))) {
+
fieldInfo.setFieldFormat(field.get(FIELD_FORMAT).getAsString());
+ }
+ fieldInfo.setFieldName(key);
+ fieldInfos.put(key, fieldInfo);
+ }
+ }
+ }
+ return fieldInfos;
}
/**
- * Add fieldss
+ * Add fields by REST API.
*
- * @param indexName The index name of Elasticsearch
- * @param fieldInfos The fields info of Elasticsearch
- * @throws IOException The exception may throws
+ * @param indexName elasticsearch index name
+ * @param fieldInfos elasticsearch field infos
+ * @throws Exception any exception if occurred
*/
- public void addFields(String indexName, List<ElasticsearchFieldInfo>
fieldInfos) throws IOException {
- if (CollectionUtils.isNotEmpty(fieldInfos)) {
- List<String> fieldList = getMappingInfo(fieldInfos);
- StringBuilder mapping = new StringBuilder().append("{\n
\"properties\" : {\n")
- .append(StringUtils.join(fieldList, ",\n")).append("\n
}\n}");
- System.out.println(mapping.toString());
- PutMappingRequest indexRequest = new PutMappingRequest(indexName)
- .source(mapping.toString(), XContentType.JSON);
- AcknowledgedResponse acknowledgedResponse = getEsClient().indices()
- .putMapping(indexRequest, RequestOptions.DEFAULT);
- LOG.info("put mapping: {} result: {}", mapping.toString(),
acknowledgedResponse.toString());
+ public void addFields(String indexName, List<ElasticsearchFieldInfo>
fieldInfos) throws Exception {
+ ElasticsearchIndexMappingInfo mappingInfo = getMappingInfo(fieldInfos);
+ if (ObjectUtils.isNotEmpty(mappingInfo) &&
!mappingInfo.getMappings().getProperties().isEmpty()) {
+ String url = esConfig.getOneHttpUrl() + InlongConstants.SLASH +
indexName + "/_mapping";
+ HttpUtils.request(esConfig.getRestClient(), url, HttpMethod.PUT,
+
GSON.toJsonTree(mappingInfo.getMappings()).getAsJsonObject().toString(),
getHttpHeaders(),
+ Object.class);
}
}
/**
- * Add not exist fields
+ * Add not exist fields by REST API.
*
- * @param indexName The index name of elasticsearch
- * @param fieldInfos The fields info of elasticsearch
- * @throws IOException The exception may throws
+ * @param indexName elasticsearch index name
+ * @param fieldInfos elasticsearch field infos
+ * @throws Exception any exception if occurred
*/
- public void addNotExistFields(String indexName,
- List<ElasticsearchFieldInfo> fieldInfos) throws IOException {
- List<ElasticsearchFieldInfo> notExistFieldInfos = new
ArrayList<>(fieldInfos);
- Map<String, MappingMetaData> mapping = getFields(indexName);
- Map<String, Object> filedMap = (Map<String, Object>)
mapping.get(indexName).getSourceAsMap().get(FIELD_KEY);
- for (String key : filedMap.keySet()) {
- for (ElasticsearchFieldInfo field : notExistFieldInfos) {
- if (field.getFieldName().equals(key)) {
- notExistFieldInfos.remove(field);
- break;
- }
+ public void addNotExistFields(String indexName,
List<ElasticsearchFieldInfo> fieldInfos) throws Exception {
+ List<ElasticsearchFieldInfo> notExistFieldInfos = new ArrayList<>();
+ Map<String, ElasticsearchFieldInfo> mappings =
getMappingMap(indexName);
+ for (ElasticsearchFieldInfo fieldInfo : fieldInfos) {
+ if (!mappings.containsKey(fieldInfo.getFieldName())) {
+ notExistFieldInfos.add(fieldInfo);
}
}
- addFields(indexName, notExistFieldInfos);
- }
-
- /**
- * Get Elasticsearch client
- *
- * @return RestHighLevelClient
- */
- public RestHighLevelClient getEsClient() {
- return esConfig.highLevelClient();
+ if (!notExistFieldInfos.isEmpty()) {
+ addFields(indexName, notExistFieldInfos);
+ }
}
/**
- * Get Elasticsearch client
+ * Get Elasticsearch configuration.
*
* @param config Elasticsearch's configuration
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
index 35f1272bd4..6f4580897a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchConfig.java
@@ -22,20 +22,16 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
/**
* Elasticsearch config information, including host, port, etc.
@@ -45,7 +41,13 @@ import java.util.List;
public class ElasticsearchConfig {
private static final Logger logger =
LoggerFactory.getLogger(ElasticsearchConfig.class);
- private static RestHighLevelClient highLevelClient;
+
+ private final Random rand = new Random();
+
+ @Autowired
+ private RestTemplate restTemplate;
+
+ private List<HttpHost> httpHosts;
@Value("${es.index.search.hostname}")
private String hosts;
@Value("${es.auth.enable}")
@@ -56,54 +58,82 @@ public class ElasticsearchConfig {
private String password;
/**
- * highLevelClient
+ * Get http rest client.
*
- * @return RestHighLevelClient
+ * @return springframework RestTemplate
*/
- public RestHighLevelClient highLevelClient() {
- if (highLevelClient != null) {
- return highLevelClient;
+ public RestTemplate getRestClient() {
+ if (restTemplate != null) {
+ return restTemplate;
}
try {
- synchronized (RestHighLevelClient.class) {
- if (highLevelClient == null) {
- List<HttpHost> hosts = new ArrayList<>();
- String[] hostArrays =
this.hosts.split(InlongConstants.SEMICOLON);
- for (String host : hostArrays) {
- if (StringUtils.isNotBlank(host)) {
- host = host.trim();
- hosts.add(HttpHost.create(host));
- }
- }
- RestClientBuilder clientBuilder =
RestClient.builder(hosts.toArray(new HttpHost[0]));
- this.setEsAuth(clientBuilder);
- highLevelClient = new RestHighLevelClient(clientBuilder);
+ synchronized (RestTemplate.class) {
+ if (restTemplate == null) {
+ restTemplate = new RestTemplate();
}
}
} catch (Exception e) {
logger.error("get es high level client error", e);
}
- return highLevelClient;
+ return restTemplate;
}
/**
- * Elasticsearch authentication
+ * Get http hosts.
*
- * @param builder The builder
+ * @return list of http host info
*/
- private void setEsAuth(RestClientBuilder builder) {
+ public List<HttpHost> getHttpHosts() {
+ if (httpHosts != null) {
+ return httpHosts;
+ }
try {
- logger.info("set es auth of enable={}", authEnable);
- if (authEnable) {
- final CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY, new
UsernamePasswordCredentials(username, password));
- builder.setHttpClientConfigCallback(
- httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
-
+ synchronized (HttpHost.class) {
+ httpHosts = new ArrayList<>();
+ String[] hostArrays =
this.hosts.split(InlongConstants.SEMICOLON);
+ for (String host : hostArrays) {
+ if (StringUtils.isNotBlank(host)) {
+ host = host.trim();
+ httpHosts.add(HttpHost.create(host));
+ }
+ }
}
} catch (Exception e) {
- logger.error("set es auth error ", e);
+ logger.error("get es http hosts error", e);
+ }
+ return httpHosts;
+ }
+
+ /**
+ * Get one http url.
+ *
+ * @return a http url
+ * @throws Exception any exception if occurred
+ */
+ public String getOneHttpUrl() throws Exception {
+ getHttpHosts();
+ if (!httpHosts.isEmpty() && httpHosts.size() > 0) {
+ return httpHosts.get(rand.nextInt(httpHosts.size())).toString();
}
+ throw new Exception("http hosts is empty! please check hosts!");
}
+ /**
+ * Get all http url.
+ *
+ * @param urlSuffix
+ * @return http url array
+ * @throws Exception
+ */
+ public String[] getHttpUrls(String urlSuffix) throws Exception {
+ getHttpHosts();
+ if (!httpHosts.isEmpty() && httpHosts.size() > 0) {
+ String[] urls = new String[httpHosts.size()];
+ for (int i = 0; i < urls.length; i++) {
+ urls[i] = httpHosts.get(i) + urlSuffix;
+ }
+ return urls;
+ }
+ throw new Exception("http hosts is empty! please check hosts!");
+ }
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
new file mode 100644
index 0000000000..aabf3f008d
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchApiTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link
org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi}.
+ */
+public class ElasticsearchApiTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchApiTest.class);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ private static final String INTER_CONTAINER_ELASTICSEARCH_ALIAS =
"elasticsearch";
+
+ private static final String ELASTICSEARCH_DOCKER_IMAGE_NAME =
"elasticsearch:7.9.3";
+
+ private ElasticsearchApi elasticsearchApi;
+
+ private ElasticsearchConfig elasticsearchConfig;
+
+ private static final Gson GSON = new GsonBuilder().create();
+
+ private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new
ElasticsearchContainer(
+ DockerImageName.parse(ELASTICSEARCH_DOCKER_IMAGE_NAME)
+
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
+ .withNetwork(NETWORK)
+ .withAccessToHost(true)
+ .withEnv("discovery.type", "single-node")
+ .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
+ .withEnv("ELASTIC_PASSWORD", "test")
+
.withNetworkAliases(INTER_CONTAINER_ELASTICSEARCH_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @BeforeAll
+ public static void beforeAll() {
+ ELASTICSEARCH_CONTAINER.setPortBindings(Arrays.asList("9200:9200",
"9300:9300"));
+ ELASTICSEARCH_CONTAINER.withStartupTimeout(Duration.ofSeconds(300));
+
ELASTICSEARCH_CONTAINER.setDockerImageName(ELASTICSEARCH_DOCKER_IMAGE_NAME);
+ Startables.deepStart(Stream.of(ELASTICSEARCH_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @BeforeEach
+ public void before() {
+ elasticsearchConfig = new ElasticsearchConfig();
+ elasticsearchConfig.setHosts("http://127.0.0.1:9200");
+ elasticsearchConfig.setAuthEnable(true);
+ elasticsearchConfig.setUsername("admin");
+ elasticsearchConfig.setPassword("inlong");
+ elasticsearchApi = new ElasticsearchApi();
+ elasticsearchApi.setEsConfig(elasticsearchConfig);
+ }
+
+ @AfterAll
+ public static void teardown() {
+ if (ELASTICSEARCH_CONTAINER != null) {
+ ELASTICSEARCH_CONTAINER.stop();
+ }
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#search(String, JsonObject)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSearch() throws Exception {
+ final String indexName = "test_search";
+ final String searchJson = "{\n \"query\": {\n \"match_all\": {}\n
}\n}";
+ final JsonObject search = GSON.fromJson(searchJson, JsonObject.class);
+ elasticsearchApi.createIndex(indexName);
+ JsonObject result = elasticsearchApi.search(indexName, search);
+ assertNotNull(result);
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#indexExists(String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIndexExists() throws Exception {
+ final String indexName = "test_index_exists";
+ boolean result = elasticsearchApi.indexExists(indexName);
+ assertEquals(false, result);
+ elasticsearchApi.createIndex(indexName);
+ result = elasticsearchApi.indexExists(indexName);
+ assertEquals(true, result);
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#ping()}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPing() throws Exception {
+ boolean result = elasticsearchApi.ping();
+ assertEquals(true, result);
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#createIndex(String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateIndex() throws Exception {
+ final String indexName = "test_create_index";
+ elasticsearchApi.createIndex(indexName);
+ assertTrue(elasticsearchApi.indexExists(indexName));
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#createIndexAndMapping(String,
List)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateIndexAndMapping() throws Exception {
+ final String indexName = "test_create_index_and_mapping";
+ final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+ final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+ log_ts.setFieldType("keyword");
+ log_ts.setFieldName("log_ts");
+ fieldInfos.add(log_ts);
+ elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+ assertTrue(elasticsearchApi.indexExists(indexName));
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#getMappingMap(String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetMappingInfo() throws Exception {
+ final String indexName = "test_get_mapping_info";
+ final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+ final ElasticsearchFieldInfo count = new ElasticsearchFieldInfo();
+ count.setFieldType("double");
+ count.setFieldName("count");
+ fieldInfos.add(count);
+ final ElasticsearchFieldInfo date = new ElasticsearchFieldInfo();
+ date.setFieldType("date");
+ date.setFieldName("date");
+ date.setFieldFormat("yyyy-MM-dd HH:mm:ss||yyy-MM-dd||epoch_millis");
+ fieldInfos.add(date);
+ final ElasticsearchFieldInfo delay = new ElasticsearchFieldInfo();
+ delay.setFieldType("double");
+ delay.setFieldName("delay");
+ fieldInfos.add(delay);
+ final ElasticsearchFieldInfo inlong_group_id = new
ElasticsearchFieldInfo();
+ inlong_group_id.setFieldType("text");
+ inlong_group_id.setFieldName("inlong_group_id");
+ fieldInfos.add(inlong_group_id);
+ final ElasticsearchFieldInfo inlong_stream_id = new
ElasticsearchFieldInfo();
+ inlong_stream_id.setFieldType("text");
+ inlong_stream_id.setFieldName("inlong_stream_id");
+ fieldInfos.add(inlong_stream_id);
+ final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+ log_ts.setFieldType("keyword");
+ log_ts.setFieldName("log_ts");
+ fieldInfos.add(log_ts);
+ elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+ Map<String, ElasticsearchFieldInfo> result =
elasticsearchApi.getMappingMap(indexName);
+ assertEquals("double", result.get("count").getFieldType());
+ assertEquals("double", result.get("delay").getFieldType());
+ assertEquals("date", result.get("date").getFieldType());
+ assertEquals("yyyy-MM-dd HH:mm:ss||yyy-MM-dd||epoch_millis",
result.get("date").getFieldFormat());
+ assertEquals("text", result.get("inlong_stream_id").getFieldType());
+ assertEquals("text", result.get("inlong_group_id").getFieldType());
+ assertEquals("keyword", result.get("log_ts").getFieldType());
+ }
+
+ /**
+ * Test cases for {@link ElasticsearchApi#addFields(String, List)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAddFields() throws Exception {
+ final String indexName = "test_add_fields";
+ final List<ElasticsearchFieldInfo> fieldInfos = new ArrayList<>();
+ final ElasticsearchFieldInfo count = new ElasticsearchFieldInfo();
+ count.setFieldType("double");
+ count.setFieldName("count");
+ fieldInfos.add(count);
+ elasticsearchApi.createIndexAndMapping(indexName, fieldInfos);
+
+ final List<ElasticsearchFieldInfo> addFieldInfos = new ArrayList<>();
+ final ElasticsearchFieldInfo log_ts = new ElasticsearchFieldInfo();
+ log_ts.setFieldType("keyword");
+ log_ts.setFieldName("log_ts");
+ addFieldInfos.add(log_ts);
+ elasticsearchApi.addFields(indexName, addFieldInfos);
+
+ Map<String, ElasticsearchFieldInfo> result =
elasticsearchApi.getMappingMap(indexName);
+ assertEquals("double", result.get("count").getFieldType());
+ assertEquals("keyword", result.get("log_ts").getFieldType());
+ }
+}