This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f4084f307a [INLONG-8794][Manager] Support add streamField and
sinkField (#8795)
f4084f307a is described below
commit f4084f307a52475a71b87160eaed6ae6e7ac8ef8
Author: fuweng11 <[email protected]>
AuthorDate: Thu Aug 24 17:36:28 2023 +0800
[INLONG-8794][Manager] Support add streamField and sinkField (#8795)
---
.../inlong/manager/pojo/sink/AddFieldRequest.java | 48 +++++++++++++++
.../manager/service/sink/StreamSinkService.java | 10 +++
.../service/sink/StreamSinkServiceImpl.java | 38 ++++++++++++
.../service/stream/InlongStreamService.java | 9 +++
.../service/stream/InlongStreamServiceImpl.java | 48 +++++++++++++++
.../manager/service/stream/InlongStreamTest.java | 72 ++++++++++++++++++++++
.../web/controller/InlongStreamController.java | 7 +++
.../openapi/OpenInLongStreamController.java | 7 +++
8 files changed, 239 insertions(+)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/AddFieldRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/AddFieldRequest.java
new file mode 100644
index 0000000000..1c98992e08
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/AddFieldRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+/**
+ * Add field request - with stream
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Add field request - with stream")
+public class AddFieldRequest {
+
+ @ApiModelProperty("Inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty("Inlong stream id")
+ private String inlongStreamId;
+
+ @ApiModelProperty("Sink field list")
+ private List<SinkField> sinkFieldList;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 764d2b62cb..06f97bb762 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.sink;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -248,6 +249,15 @@ public interface StreamSinkService {
*/
Boolean updateAfterApprove(List<SinkApproveDTO> sinkApproveList, String
operator);
+ /**
+ * Add field info
+ *
+ * @param sinkEntity stream sink information
+ * @param sinkFieldList sink field info
+ * @return whether succeed
+ */
+ boolean addFields(StreamSinkEntity sinkEntity, List<SinkField>
sinkFieldList);
+
/**
* Converts a statement to a sinkFields
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index b9c0ffe40f..31092bdb0b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -24,10 +24,12 @@ import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
@@ -78,6 +80,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -700,6 +703,41 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
+ @Override
+ public boolean addFields(StreamSinkEntity sinkEntity, List<SinkField>
sinkFieldList) {
+ Set<String> existFields =
sinkFieldMapper.selectBySinkId(sinkEntity.getId()).stream()
+
.map(StreamSinkFieldEntity::getFieldName).collect(Collectors.toSet());
+
+ LOGGER.debug("begin to save sink fields={}", sinkFieldList);
+ if (CollectionUtils.isEmpty(sinkFieldList)) {
+ return true;
+ }
+ List<StreamSinkFieldEntity> needAddFieldList = new ArrayList<>();
+ for (SinkField fieldInfo : sinkFieldList) {
+ if (existFields.contains(fieldInfo.getFieldName())) {
+ LOGGER.debug("current sink field={} is exist for groupId={},
streamId={}", fieldInfo.getFieldName(),
+ sinkEntity.getInlongGroupId(),
sinkEntity.getInlongStreamId());
+ continue;
+ }
+ StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo,
+ StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ fieldEntity.setInlongGroupId(sinkEntity.getInlongGroupId());
+ fieldEntity.setInlongStreamId(sinkEntity.getInlongStreamId());
+ fieldEntity.setSinkType(sinkEntity.getSinkType());
+ fieldEntity.setSinkId(sinkEntity.getId());
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+ needAddFieldList.add(fieldEntity);
+ }
+ if (CollectionUtils.isNotEmpty(needAddFieldList)) {
+ sinkFieldMapper.insertAll(needAddFieldList);
+ }
+ LOGGER.debug("success to save sink fields={}", needAddFieldList);
+ return true;
+ }
+
@Override
public List<SinkField> parseFields(ParseFieldRequest parseFieldRequest) {
try {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index f0f53cd409..328e9ae73c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.stream;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -199,6 +200,14 @@ public interface InlongStreamService {
*/
Boolean logicDeleteAll(String groupId, String operator);
+ /**
+ * Add field for stream or sink
+ *
+ * @param addFieldRequest add field request
+ * @return true or false
+ */
+ boolean addFields(AddFieldRequest addFieldRequest);
+
/**
* According to the group id, query the number of valid inlong streams
belonging to this service
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 9083c3d8de..b9562d5f4e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -29,17 +29,21 @@ import
org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -90,6 +94,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
@@ -127,6 +132,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
@Autowired
private StreamSinkService sinkService;
@Autowired
+ private StreamSinkEntityMapper sinkMapper;
+ @Autowired
private ObjectMapper objectMapper;
@Autowired
@Lazy
@@ -756,6 +763,47 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
LOGGER.info("success to logic delete dlq or rlq by groupId={},
topicName={}", groupId, topicName);
}
+ @Override
+ public boolean addFields(AddFieldRequest addFieldsRequest) {
+ String groupId = addFieldsRequest.getInlongGroupId();
+ String streamId = addFieldsRequest.getInlongStreamId();
+ try {
+ LOGGER.info("begin to add inlong stream fields ={}",
addFieldsRequest.getSinkFieldList());
+ Set<String> existFieldList =
streamFieldMapper.selectByIdentifier(groupId, streamId).stream()
+
.map(InlongStreamFieldEntity::getFieldName).collect(Collectors.toSet());
+ List<InlongStreamFieldEntity> needAddFieldList = new ArrayList<>();
+ for (SinkField sinkField : addFieldsRequest.getSinkFieldList()) {
+ if (existFieldList.contains(sinkField.getSourceFieldName())) {
+ LOGGER.info("current stream field={} is exist for
groupId={}, streamId={}",
+ sinkField.getSourceFieldName(), groupId, streamId);
+ continue;
+ }
+ InlongStreamFieldEntity entity = new InlongStreamFieldEntity();
+ entity.setFieldName(sinkField.getSourceFieldName());
+ entity.setFieldType(sinkField.getSourceFieldType());
+ entity.setFieldComment(sinkField.getFieldComment());
+ entity.setInlongGroupId(groupId);
+ entity.setInlongStreamId(streamId);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
+ needAddFieldList.add(entity);
+ }
+ if (CollectionUtils.isNotEmpty(needAddFieldList)) {
+ streamFieldMapper.insertAll(needAddFieldList);
+ }
+ List<StreamSinkEntity> sinkEntityList =
sinkMapper.selectByRelatedId(groupId, streamId);
+ for (StreamSinkEntity sink : sinkEntityList) {
+ sinkService.addFields(sink,
addFieldsRequest.getSinkFieldList());
+ }
+ LOGGER.debug("success add inlong stream fields={}",
needAddFieldList);
+
+ } catch (Exception e) {
+ LOGGER.error("add inlong stream fields error for groupId={},
streamId={}", groupId, streamId, e);
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ String.format("add stream fields error : %s",
e.getMessage()));
+ }
+ return true;
+ }
+
@Override
public List<StreamField> parseFields(ParseFieldRequest parseFieldRequest) {
try {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
index d5602c2bf2..f917a8ae29 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/stream/InlongStreamTest.java
@@ -17,10 +17,19 @@
package org.apache.inlong.manager.service.stream;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.junit.jupiter.api.Assertions;
@@ -35,8 +44,22 @@ import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_
public class InlongStreamTest extends ServiceBaseTest {
+ private final String globalGroupId = "default_group";
+ private final String globalStreamId = "default_stream";
+ private final String globalOperator = "admin";
+ private final String sinkName = "default_sink";
+ private final String jdbcUrl = "127.0.0.1:8080";
+
+ @Autowired
+ private InlongStreamService streamService;
@Autowired
protected StreamSinkService streamSinkService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+ @Autowired
+ private InlongStreamFieldEntityMapper streamFieldEntityMapper;
+ @Autowired
+ private StreamSinkFieldEntityMapper streamSinkFieldEntityMapper;
@Test
public void testParseStreamFieldsByJson() {
@@ -125,4 +148,53 @@ public class InlongStreamTest extends ServiceBaseTest {
Assertions.assertArrayEquals(expectResult, result);
}
+ @Test
+ public void testAddFieldsForStream() {
+ streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
globalOperator);
+ HiveSinkRequest sinkInfo = new HiveSinkRequest();
+ sinkInfo.setInlongGroupId(globalGroupId);
+ sinkInfo.setInlongStreamId(globalStreamId);
+ sinkInfo.setSinkType(SinkType.HIVE);
+
sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setSinkName(sinkName);
+ sinkInfo.setJdbcUrl(jdbcUrl);
+ Integer id = streamSinkService.save(sinkInfo, globalOperator);
+ AddFieldRequest addFieldRequest = new AddFieldRequest();
+ addFieldRequest.setInlongGroupId(globalGroupId);
+ addFieldRequest.setInlongStreamId(globalStreamId);
+ List<SinkField> sinkFields = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ SinkField field = new SinkField();
+ field.setInlongGroupId(globalGroupId);
+ field.setInlongStreamId(globalStreamId);
+ field.setSourceFieldName("sinkFieldName" + i);
+ field.setSourceFieldType("string");
+ field.setFieldName("sinkFieldName" + i);
+ field.setFieldType("string");
+ if (i == 0) {
+ field.setFieldComment("desc0 content");
+ }
+ sinkFields.add(field);
+ }
+ addFieldRequest.setSinkFieldList(sinkFields);
+ streamService.addFields(addFieldRequest);
+ for (int i = 1; i < 5; i++) {
+ SinkField field = new SinkField();
+ field.setInlongGroupId(globalGroupId);
+ field.setInlongStreamId(globalStreamId);
+ field.setSourceFieldName("sinkFieldName" + i);
+ field.setSourceFieldType("string");
+ field.setFieldName("sinkFieldName" + i);
+ field.setFieldType("string");
+ sinkFields.add(field);
+ }
+ streamService.addFields(addFieldRequest);
+ List<InlongStreamFieldEntity> streamFieldEntityList =
+ streamFieldEntityMapper.selectByIdentifier(globalGroupId,
globalStreamId);
+ List<StreamSinkFieldEntity> sinkFieldEntityList =
streamSinkFieldEntityMapper.selectBySinkId(id);
+ Assertions.assertEquals(5, streamFieldEntityList.size());
+ Assertions.assertEquals(5, sinkFieldEntityList.size());
+
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index 64bbeb4671..2407cdd283 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -200,6 +201,12 @@ public class InlongStreamController {
return Response.success(streamService.delete(groupId, streamId,
username));
}
+ @RequestMapping(value = "/stream/addFields", method = RequestMethod.POST)
+ @ApiOperation(value = "Add inlong stream fields")
+ public Response<Boolean> addFields(@RequestBody AddFieldRequest
addFieldsRequest) {
+ return Response.success(streamService.addFields(addFieldsRequest));
+ }
+
@RequestMapping(value = "/stream/parseFields", method = RequestMethod.POST)
@ApiOperation(value = "Parse inlong stream fields from statement")
public Response<List<StreamField>> parseFields(@RequestBody
ParseFieldRequest parseFieldRequest) {
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 5e8f87fa3b..864b278851 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
@@ -137,4 +138,10 @@ public class OpenInLongStreamController {
String operator = LoginUserUtils.getLoginUser().getName();
return Response.success(streamProcessOperation.startProcess(groupId,
streamId, operator, sync));
}
+
+ @RequestMapping(value = "/stream/addFields", method = RequestMethod.POST)
+ @ApiOperation(value = "Add inlong stream fields")
+ public Response<Boolean> addFields(@RequestBody AddFieldRequest
addFieldsRequest) {
+ return Response.success(streamService.addFields(addFieldsRequest));
+ }
}