This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c39c087b1 [INLONG-10911][Manager] Support pagination to query sort
task details information (#10912)
5c39c087b1 is described below
commit 5c39c087b11750b6498ff80fbc5a9d686f728881
Author: fuweng11 <[email protected]>
AuthorDate: Tue Aug 27 13:05:07 2024 +0800
[INLONG-10911][Manager] Support pagination to query sort task details
information (#10912)
---
.../manager/service/sink/StreamSinkService.java | 9 ++
.../service/sink/StreamSinkServiceImpl.java | 103 ++++++++++++++++++++-
.../web/controller/StreamSinkController.java | 7 ++
3 files changed, 117 insertions(+), 2 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index ea46dc3432..a0b6a29443 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -115,6 +115,15 @@ public interface StreamSinkService {
*/
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request,
String operator);
+ /**
+ * Paging query stream sink detail info based on conditions.
+ *
+ * @param request paging request
+ * @param operator operator
+ * @return sink detail page list
+ */
+ PageResult<Map<String, Object>> listDetail(SinkPageRequest request, String
operator);
+
/**
* Paging query stream sink info based on conditions.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 2180400305..9177253d90 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.sink;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationTarget;
@@ -25,21 +27,31 @@ import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -93,6 +105,8 @@ import static
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NO
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
+import static
org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;
/**
* Implementation of sink service interface
@@ -104,7 +118,10 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
private static final Pattern PARSE_FIELD_CSV_SPLITTER =
Pattern.compile("[\t\\s,]");
private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
-
+ @Autowired
+ private SortConfigEntityMapper sortConfigEntityMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterEntityMapper;
@Autowired
private SinkOperatorFactory operatorFactory;
@Autowired
@@ -121,7 +138,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired
private ObjectMapper objectMapper;
-
// To avoid circular dependencies, you cannot use @Autowired, it will be
injected by AutowireCapableBeanFactory
private InlongStreamProcessService streamProcessOperation;
@@ -297,6 +313,89 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return pageResult;
}
+ @Override
+ public PageResult<Map<String, Object>> listDetail(SinkPageRequest request,
String operator) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ OrderFieldEnum.checkOrderField(request);
+ OrderTypeEnum.checkOrderType(request);
+ Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>)
sinkMapper.selectByCondition(request);
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
+ InlongGroupInfo groupInfo = null;
+ switch (groupEntity.getMqType()) {
+ case MQType.PULSAR:
+ groupInfo = CommonBeanUtils.copyProperties(groupEntity,
InlongPulsarInfo::new, true);
+ break;
+ case MQType.TUBEMQ:
+ groupInfo = CommonBeanUtils.copyProperties(groupEntity,
InlongTubeMQInfo::new, true);
+ break;
+ case MQType.KAFKA:
+ groupInfo = CommonBeanUtils.copyProperties(groupEntity,
InlongKafkaInfo::new, true);
+ default:
+ throw new
BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
+ }
+ InlongGroupInfo finalGroupInfo = groupInfo;
+ List<Map<String, Object>> responseList = entityPage.stream().map(sink
-> {
+ StreamSinkOperator sinkOperator =
operatorFactory.getInstance(sink.getSinkType());
+ StreamSink streamSink = sinkOperator.getFromEntity(sink);
+ Map<String, Object> requestMap =
JsonUtils.OBJECT_MAPPER.convertValue(streamSink,
+ new TypeReference<Map<String, Object>>() {
+ });
+ InlongStreamEntity streamEntity =
+
streamMapper.selectByIdentifier(request.getInlongGroupId(),
sink.getInlongStreamId());
+ String topic = "";
+ String consumeGroup = "";
+ switch (groupEntity.getMqType()) {
+ case MQType.PULSAR:
+ List<InlongClusterEntity> pulsarClusters =
clusterEntityMapper.selectByKey(
+ finalGroupInfo.getInlongClusterTag(), null,
MQType.PULSAR);
+ InlongPulsarDTO pulsarDTO =
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+ if (CollectionUtils.isEmpty(pulsarClusters)) {
+ break;
+ }
+ String tenant = pulsarDTO.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ InlongClusterEntity pulsarCluster =
pulsarClusters.get(0);
+ // Multiple adminUrls should be configured for pulsar,
+ // otherwise all requests will be sent to the same
broker
+ PulsarClusterDTO pulsarClusterDTO =
PulsarClusterDTO.getFromJson(pulsarCluster.getExtParams());
+ tenant = pulsarClusterDTO.getPulsarTenant();
+ }
+ String fullTopicName =
+ tenant + "/" + finalGroupInfo.getMqResource() +
"/" + streamEntity.getMqResource();
+ topic = "persistent://" + fullTopicName;
+ consumeGroup = String.format(PULSAR_SUBSCRIPTION,
finalGroupInfo.getInlongClusterTag(),
+ fullTopicName, sink.getId());
+ break;
+ case MQType.TUBEMQ:
+ topic = streamEntity.getMqResource();
+ consumeGroup = String.format(TUBE_CONSUMER_GROUP,
groupEntity.getInlongClusterTag(), topic,
+ sink.getId());
+ break;
+ case MQType.KAFKA:
+ topic = streamEntity.getMqResource();
+ if (topic.equals(streamEntity.getInlongStreamId())) {
+ // the default mq resource (stream id) is not
sufficient to discriminate different kafka topics
+ topic =
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ finalGroupInfo.getMqResource(),
streamEntity.getMqResource());
+ }
+ break;
+ default:
+ throw new
BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
+ }
+ requestMap.put("topic", topic);
+ requestMap.put("consumerGroup", consumeGroup);
+ SortConfigEntity sortConfigEntity =
sortConfigEntityMapper.selectBySinkId(sink.getId());
+ if (sortConfigEntity != null) {
+ requestMap.put("dataFlowInfo",
sortConfigEntity.getConfigParams());
+ }
+ return requestMap;
+ }).collect(Collectors.toList());
+ PageResult<Map<String, Object>> pageResult = new
PageResult<>(responseList, entityPage.getTotal(),
+ entityPage.getPageNum(), entityPage.getPageSize());
+ LOGGER.debug("success to list sink detail page, result size {}",
pageResult.getList().size());
+ return pageResult;
+ }
+
@Override
public List<? extends StreamSink> listByCondition(SinkPageRequest request,
UserInfo opInfo) {
// check sink id
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 4fd4eaabb6..b7b8dbd5d4 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
+import java.util.Map;
/**
* Stream sink control layer
@@ -88,6 +89,12 @@ public class StreamSinkController {
return Response.success(sinkService.listByCondition(request,
LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/sink/listDetail", method = RequestMethod.POST)
+ @ApiOperation(value = "List stream sinks detail by paginating")
+ public Response<PageResult<Map<String, Object>>> listDetail(@RequestBody
SinkPageRequest request) {
+ return Response.success(sinkService.listDetail(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.SINK)
@ApiOperation(value = "Update stream sink")