This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb23e18fa [INLONG-9259][Manager] Optimize Elasticsearch sink and
datanode (#9276)
5cb23e18fa is described below
commit 5cb23e18faa514f4838efd6a55c9abf579b25cf4
Author: vernedeng <[email protected]>
AuthorDate: Mon Nov 13 22:19:34 2023 +0800
[INLONG-9259][Manager] Optimize Elasticsearch sink and datanode (#9276)
---
.../pojo/node/es/ElasticsearchDataNodeDTO.java | 12 +++
.../manager/pojo/sink/es/ElasticsearchSink.java | 13 ++++
.../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 57 +++-----------
.../pojo/sink/es/ElasticsearchSinkRequest.java | 37 ++-------
.../node/es/ElasticsearchDataNodeOperator.java | 5 ++
.../sink/es/ElasticsearchResourceOperator.java | 87 ----------------------
.../service/sink/es/ElasticsearchSinkOperator.java | 17 -----
.../service/sink/ElasticsearchSinkServiceTest.java | 7 --
8 files changed, 47 insertions(+), 188 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
index 79fba20888..e55e1ef338 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java
@@ -73,6 +73,18 @@ public class ElasticsearchDataNodeDTO {
@ApiModelProperty("audit set name")
private String auditSetName;
+ @ApiModelProperty("http hosts")
+ private String httpHosts;
+
+ @ApiModelProperty("user name")
+ private String username;
+
+ @ApiModelProperty("token")
+ private String token;
+
+ @ApiModelProperty("password")
+ private String password;
+
/**
* Get the dto instance from the request
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
index 5f92dddbeb..a99c148790 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSink.java
@@ -83,6 +83,19 @@ public class ElasticsearchSink extends StreamSink {
@ApiModelProperty("The multiple index-pattern of sink")
private String indexPattern;
+ // sortstandalone
+ @ApiModelProperty("indexNamePattern")
+ private String indexNamePattern;
+
+ @ApiModelProperty("contentOffset")
+ private Integer contentOffset;
+
+ @ApiModelProperty("fieldOffset")
+ private Integer fieldOffset;
+
+ @ApiModelProperty("separator")
+ private String separator;
+
public ElasticsearchSink() {
this.setSinkType(SinkType.ELASTICSEARCH);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 8d08988fc8..35565f2cd5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.sink.es;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -32,9 +31,6 @@ import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
/**
* Sink info of Elasticsearch
*/
@@ -44,44 +40,17 @@ import java.util.Map;
@AllArgsConstructor
public class ElasticsearchSinkDTO {
- @ApiModelProperty("Host of the Elasticsearch server")
- private String hosts;
-
- @ApiModelProperty("Username of the Elasticsearch server")
- private String username;
-
- @ApiModelProperty("User password of the Elasticsearch server")
- private String password;
-
- @ApiModelProperty("Elasticsearch index name")
- private String indexName;
-
- @ApiModelProperty("Flush interval, unit: second, default is 1s")
- private Integer flushInterval;
-
- @ApiModelProperty("Flush when record number reaches flushRecord")
- private Integer flushRecord;
-
- @ApiModelProperty("Write max retry times, default is 3")
- private Integer retryTimes;
+ @ApiModelProperty("indexNamePattern")
+ private String indexNamePattern;
- @ApiModelProperty("Key field names, separate with commas")
- private String keyFieldNames;
+ @ApiModelProperty("contentOffset")
+ private Integer contentOffset;
- @ApiModelProperty("Document Type")
- private String documentType;
+ @ApiModelProperty("fieldOffset")
+ private Integer fieldOffset;
- @ApiModelProperty("Primary Key")
- private String primaryKey;
-
- @ApiModelProperty("Elasticsearch version")
- private Integer esVersion;
-
- @ApiModelProperty("Password encrypt version")
- private Integer encryptVersion;
-
- @ApiModelProperty("Properties for elasticsearch")
- private Map<String, Object> properties;
+ @ApiModelProperty("separator")
+ private String separator;
/**
* Get the dto instance from the request
@@ -98,19 +67,11 @@ public class ElasticsearchSinkDTO {
*/
public static ElasticsearchSinkDTO getFromJson(@NotNull String extParams) {
try {
- return JsonUtils.parseObject(extParams,
ElasticsearchSinkDTO.class).decryptPassword();
+ return JsonUtils.parseObject(extParams,
ElasticsearchSinkDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
String.format("parse extParams of Elasticsearch SinkDTO
failure: %s", e.getMessage()));
}
}
- private ElasticsearchSinkDTO decryptPassword() throws Exception {
- if (StringUtils.isNotEmpty(this.password)) {
- byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
- this.password = new String(passwordBytes, StandardCharsets.UTF_8);
- }
- return this;
- }
-
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
index 74d009641c..182308fe0a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
@@ -37,37 +37,16 @@ import lombok.ToString;
@JsonTypeDefine(value = SinkType.ELASTICSEARCH)
public class ElasticsearchSinkRequest extends SinkRequest {
- @ApiModelProperty("Host of the Elasticsearch server")
- private String hosts;
+ @ApiModelProperty("indexNamePattern")
+ private String indexNamePattern;
- @ApiModelProperty("Username of the Elasticsearch server")
- private String username;
+ @ApiModelProperty("contentOffset")
+ private Integer contentOffset;
- @ApiModelProperty("User password of the Elasticsearch server")
- private String password;
+ @ApiModelProperty("fieldOffset")
+ private Integer fieldOffset;
- @ApiModelProperty("Elasticsearch index name")
- private String indexName;
-
- @ApiModelProperty("Flush interval, unit: second, default is 1s")
- private Integer flushInterval;
-
- @ApiModelProperty("Flush when record number reaches flushRecord")
- private Integer flushRecord;
-
- @ApiModelProperty("Write max retry times, default is 3")
- private Integer retryTimes;
-
- @ApiModelProperty("Key field names, separate with commas")
- private String keyFieldNames;
-
- @ApiModelProperty("Document Type")
- private String documentType;
-
- @ApiModelProperty("Primary Key")
- private String primaryKey;
-
- @ApiModelProperty("Elasticsearch version")
- private Integer esVersion;
+ @ApiModelProperty("separator")
+ private String separator;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 272af930bc..7c25f0502a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -45,6 +45,9 @@ public class ElasticsearchDataNodeOperator extends
AbstractDataNodeOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchDataNodeOperator.class);
+ // in order to compatible with the old sortstandalone version
+ public static final String KEY_PASSWORD = "password";
+
@Autowired
private ObjectMapper objectMapper;
@@ -65,6 +68,8 @@ public class ElasticsearchDataNodeOperator extends
AbstractDataNodeOperator {
try {
ElasticsearchDataNodeDTO dto =
ElasticsearchDataNodeDTO.getFromRequest(esRequest,
targetEntity.getExtParams());
+ dto.setHttpHosts(request.getUrl());
+ dto.setPassword(request.getToken());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
index 70165bf541..87c1086836 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchResourceOperator.java
@@ -20,27 +20,13 @@ package org.apache.inlong.manager.service.resource.sink.es;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
-import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Elasticsearch's resource operator
*/
@@ -48,12 +34,6 @@ import java.util.List;
public class ElasticsearchResourceOperator extends
AbstractStandaloneSinkResourceOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchResourceOperator.class);
- @Autowired
- private StreamSinkService sinkService;
- @Autowired
- private StreamSinkFieldEntityMapper sinkFieldMapper;
- @Autowired
- private DataNodeOperateHelper dataNodeHelper;
@Override
public Boolean accept(String sinkType) {
@@ -75,74 +55,7 @@ public class ElasticsearchResourceOperator extends
AbstractStandaloneSinkResourc
return;
}
- this.createIndex(sinkInfo);
this.assignCluster(sinkInfo);
}
- private void createIndex(SinkInfo sinkInfo) {
- LOGGER.info("begin to create es index for sinkId={}",
sinkInfo.getId());
-
- List<StreamSinkFieldEntity> sinkList =
sinkFieldMapper.selectBySinkId(sinkInfo.getId());
- if (CollectionUtils.isEmpty(sinkList)) {
- LOGGER.warn("no es fields found, skip to create es index for
sinkId={}", sinkInfo.getId());
- }
-
- // set fields
- List<ElasticsearchFieldInfo> fieldList =
getElasticsearchFieldFromSink(sinkList);
-
- try {
- ElasticsearchApi client = new ElasticsearchApi();
- ElasticsearchSinkDTO esInfo =
ElasticsearchSinkDTO.getFromJson(sinkInfo.getExtParams());
- client.setEsConfig(getElasticsearchConfig(sinkInfo, esInfo));
- String indexName = esInfo.getIndexName();
- boolean indexExists = client.indexExists(indexName);
-
- // 3. index not exists, create it
- if (!indexExists) {
- client.createIndexAndMapping(indexName, fieldList);
- } else {
- // 4. index exists, add fields - skip the exists fields
- client.addNotExistFields(indexName, fieldList);
- }
-
- // 5. update the sink status to success
- String info = "success to create es resource";
- sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
- LOGGER.info(info + " for sinkInfo={}", sinkInfo);
- } catch (Throwable e) {
- String errMsg = "Create Elasticsearch index failed: " +
e.getMessage();
- LOGGER.error(errMsg, e);
- sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
- throw new WorkflowException(errMsg);
- }
- }
-
- public List<ElasticsearchFieldInfo>
getElasticsearchFieldFromSink(List<StreamSinkFieldEntity> sinkList) {
- List<ElasticsearchFieldInfo> esFieldList = new ArrayList<>();
- for (StreamSinkFieldEntity fieldEntity : sinkList) {
- if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
- ElasticsearchFieldInfo elasticsearchFieldInfo =
ElasticsearchFieldInfo.getFromJson(
- fieldEntity.getExtParams());
- CommonBeanUtils.copyProperties(fieldEntity,
elasticsearchFieldInfo, true);
- esFieldList.add(elasticsearchFieldInfo);
- } else {
- ElasticsearchFieldInfo esFieldInfo = new
ElasticsearchFieldInfo();
- CommonBeanUtils.copyProperties(fieldEntity, esFieldInfo, true);
- esFieldList.add(esFieldInfo);
- }
- }
- return esFieldList;
- }
-
- private ElasticsearchConfig getElasticsearchConfig(SinkInfo sinkInfo,
ElasticsearchSinkDTO esInfo) {
- ElasticsearchConfig config = new ElasticsearchConfig();
- if (StringUtils.isNotEmpty(esInfo.getUsername())) {
- config.setAuthEnable(true);
- config.setUsername(esInfo.getUsername());
- config.setPassword(esInfo.getPassword());
- }
- config.setHosts(esInfo.getHosts());
- return config;
- }
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 06ea6fac5f..fba388029f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -17,12 +17,10 @@
package org.apache.inlong.manager.service.sink.es;
-import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -44,7 +42,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -81,20 +78,6 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
try {
ElasticsearchSinkDTO dto =
ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
- DataNodeInfo dataNodeInfo =
- dataNodeHelper.getDataNodeInfo(request.getDataNodeName(),
DataNodeType.ELASTICSEARCH);
- String esUrl = dataNodeInfo.getUrl();
- dto.setHosts(esUrl);
-
- dto.setUsername(dataNodeInfo.getUsername());
- Integer encryptVersion = AESUtils.getCurrentVersion(null);
- String passwd = null;
- if (StringUtils.isNotEmpty(dataNodeInfo.getToken())) {
- passwd =
AESUtils.encryptToString(dataNodeInfo.getToken().getBytes(StandardCharsets.UTF_8),
- encryptVersion);
- }
- dto.setPassword(passwd);
- dto.setEncryptVersion(encryptVersion);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
index ebf74d9574..964f216c9c 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
@@ -61,13 +61,6 @@ public class ElasticsearchSinkServiceTest extends
ServiceBaseTest {
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.ELASTICSEARCH);
- sinkInfo.setHosts("http://127.0.0.1:9200");
- sinkInfo.setUsername("elasticsearch");
- sinkInfo.setPassword("inlong");
- sinkInfo.setDocumentType("public");
- sinkInfo.setIndexName("index");
- sinkInfo.setPrimaryKey("name,age");
- sinkInfo.setEsVersion(7);
sinkInfo.setDataNodeName(dataNodeName);
sinkInfo.setSinkName(sinkName);