This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d20a29cb14 [INLONG-10638][Manager] Data preview supports filtering
function (#10639)
d20a29cb14 is described below
commit d20a29cb146400889958fd7b4088f92b5e4fbf8a
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jul 17 19:11:08 2024 +0800
[INLONG-10638][Manager] Data preview supports filtering function (#10639)
---
.../api/inner/client/InlongStreamClient.java | 17 +++++--
.../client/api/service/InlongStreamApi.java | 5 +-
.../inlong/manager/common/util/JsonUtils.java | 20 ++++++++
.../manager/pojo/stream/QueryMessageRequest.java | 54 ++++++++++++++++++++++
.../service/message/DeserializeOperator.java | 34 +++++++++++++-
.../message/InlongMsgDeserializeOperator.java | 10 ++--
.../service/message/PbMsgDeserializeOperator.java | 14 ++++--
.../service/message/RawMsgDeserializeOperator.java | 11 +++--
.../resource/queue/QueueResourceOperator.java | 5 +-
.../resource/queue/kafka/KafkaOperator.java | 11 +++--
.../queue/kafka/KafkaQueueResourceOperator.java | 7 +--
.../resource/queue/pulsar/PulsarOperator.java | 15 +++---
.../queue/pulsar/PulsarQueueResourceOperator.java | 11 +++--
.../queue/pulsar/QueryLatestMessagesRunnable.java | 9 ++--
.../resource/queue/tubemq/TubeMQOperator.java | 8 ++--
.../queue/tubemq/TubeMQQueueResourceOperator.java | 5 +-
.../service/stream/InlongStreamService.java | 7 ++-
.../service/stream/InlongStreamServiceImpl.java | 9 ++--
.../resource/queue/kafka/KafkaOperatorTest.java | 20 +++++---
.../web/controller/InlongStreamController.java | 11 ++---
20 files changed, 209 insertions(+), 74 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index 89ca7eef68..e5573d2db1 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.InlongStreamApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -30,11 +31,15 @@ import
org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
@@ -295,11 +300,15 @@ public class InlongStreamClient {
return parseFields(request);
}
- public List<BriefMQMessage> queryMessage(String groupId, String streamId,
Integer messageCount) {
- Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- Preconditions.expectNotBlank(streamId,
ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+ public List<BriefMQMessage> queryMessage(QueryMessageRequest request) {
+ Preconditions.expectNotBlank(request.getGroupId(),
ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Preconditions.expectNotBlank(request.getStreamId(),
ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+ Map<String, Object> requestMap = JsonUtils.parseObject(request,
+ new TypeReference<Map<String, Object>>() {
+ });
+ requestMap.entrySet().removeIf(entry ->
Objects.isNull(entry.getValue()));
Response<List<BriefMQMessage>> response = ClientUtils.executeHttpCall(
- inlongStreamApi.listMessages(groupId, streamId, messageCount));
+ inlongStreamApi.listMessages(requestMap));
ClientUtils.assertRespSuccess(response);
return response.getData();
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 6b47d8aeb2..ac31b09d20 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -34,8 +34,10 @@ import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;
import retrofit2.http.Query;
+import retrofit2.http.QueryMap;
import java.util.List;
+import java.util.Map;
public interface InlongStreamApi {
@@ -88,6 +90,5 @@ public interface InlongStreamApi {
Call<Response<List<StreamField>>> parseFields(@Body ParseFieldRequest
parseFieldRequest);
@GET("stream/listMessages")
- Call<Response<List<BriefMQMessage>>> listMessages(@Query("groupId") String
groupId,
- @Query("streamId") String streamId, @Query("messageCount") Integer
messageCount);
+ Call<Response<List<BriefMQMessage>>> listMessages(@QueryMap Map<String,
Object> query);
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
index de7f5647c1..8f21ac6364 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
@@ -121,6 +121,26 @@ public class JsonUtils {
}
}
+ /**
+ * Parse Object to Java object.
+ * <p/>
+ * This method enhancements to {@link #parseObject(Object, TypeReference)},
+ * as the above method can not solve this situation:
+ *
+ * @param object object
+ * @param typeReference The generic type is actually the parsed java type
+ * @return java object;
+ * @throws JsonException when parse error
+ */
+ public static <T> T parseObject(Object object, TypeReference<T>
typeReference) {
+ try {
+ return OBJECT_MAPPER.convertValue(object, typeReference);
+ } catch (Exception e) {
+ log.error("json parse err for: " + object, e);
+ throw new JsonException(e);
+ }
+ }
+
/**
* Parse JSON string to Java object.
* <p/>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
new file mode 100644
index 0000000000..3c939222e5
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.stream;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Query message request
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("query message request")
+public class QueryMessageRequest {
+
+ @ApiModelProperty(value = "Inlong group id")
+ private String groupId;
+
+ @ApiModelProperty(value = "Inlong stream id")
+ private String streamId;
+
+ @ApiModelProperty(value = "Message count")
+ private Integer messageCount = 100;
+
+ @ApiModelProperty(value = "Field name")
+ private String fieldName;
+
+ @ApiModelProperty(value = "Operation type, like !=, =, like")
+ private String operationType;
+
+ @ApiModelProperty(value = "TargetValue")
+ private String targetValue;
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index c62d3f6862..1fb898da99 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -21,10 +21,15 @@ import org.apache.inlong.common.enums.MessageWrapType;
import
org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+
+import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* Deserialize of message operator
@@ -55,8 +60,8 @@ public interface DeserializeOperator {
* @param index message index
* @return list of brief mq message info
*/
- default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
- byte[] msgBytes, Map<String, String> headers, int index) throws
Exception {
+ default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
List<BriefMQMessage> briefMQMessages,
+ byte[] msgBytes, Map<String, String> headers, int index,
QueryMessageRequest request) throws Exception {
return null;
}
@@ -64,4 +69,29 @@ public interface DeserializeOperator {
throw new BusinessException(String.format("current type not support
DeserializationInfo for wrapType=%s",
streamInfo.getWrapType()));
}
+
+ default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo>
streamFieldList) {
+ if (StringUtils.isBlank(request.getFieldName()) ||
StringUtils.isBlank(request.getOperationType())
+ || StringUtils.isBlank(request.getTargetValue())) {
+ return false;
+ }
+ boolean ifFilter = false;
+ FieldInfo fieldInfo = streamFieldList.stream()
+ .filter(v -> Objects.equals(v.getFieldName(),
request.getFieldName())).findFirst()
+ .orElse(null);
+ if (fieldInfo != null) {
+ switch (request.getOperationType()) {
+ case "=":
+ ifFilter = !Objects.equals(request.getTargetValue(),
fieldInfo.getFieldValue());
+ break;
+ case "!=":
+ ifFilter = Objects.equals(request.getTargetValue(),
fieldInfo.getFieldValue());
+ break;
+ case "like":
+ ifFilter = fieldInfo.getFieldValue() != null
+ &&
!fieldInfo.getFieldValue().contains(request.getTargetValue());
+ }
+ }
+ return ifFilter;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index b04931db0f..10778bbc20 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -28,6 +28,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.datatype.DataTypeOperator;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
@@ -36,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -55,11 +55,10 @@ public class InlongMsgDeserializeOperator implements
DeserializeOperator {
}
@Override
- public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[]
msgBytes, Map<String, String> headers,
- int index) {
+ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
List<BriefMQMessage> messageList,
+ byte[] msgBytes, Map<String, String> headers, int index,
QueryMessageRequest request) {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
- List<BriefMQMessage> messageList = new ArrayList<>();
InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
for (String attr : inLongMsg.getAttrs()) {
Map<String, String> attrMap = StringUtil.splitKv(attr,
INLONGMSG_ATTR_ENTRY_DELIMITER,
@@ -87,6 +86,9 @@ public class InlongMsgDeserializeOperator implements
DeserializeOperator {
DataTypeOperator dataTypeOperator =
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
List<FieldInfo> streamFieldList =
dataTypeOperator.parseFields(body, streamInfo);
+ if (checkIfFilter(request, streamFieldList)) {
+ continue;
+ }
BriefMQMessage message = BriefMQMessage.builder()
.id(index)
.inlongGroupId(groupId)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 79760cf737..e8e32ab80a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.datatype.DataTypeOperator;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
@@ -58,8 +59,8 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
}
@Override
- public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
- byte[] msgBytes, Map<String, String> headers, int index) throws
Exception {
+ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
List<BriefMQMessage> briefMQMessages,
+ byte[] msgBytes, Map<String, String> headers, int index,
QueryMessageRequest request) throws Exception {
int compressType =
Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
byte[] values = msgBytes;
switch (compressType) {
@@ -74,10 +75,12 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
default:
throw new IllegalArgumentException("Unknown compress type:" +
compressType);
}
- return transformMessageObjs(MessageObjs.parseFrom(values), streamInfo,
index);
+
briefMQMessages.addAll(transformMessageObjs(MessageObjs.parseFrom(values),
streamInfo, index, request));
+ return briefMQMessages;
}
- private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs,
InlongStreamInfo streamInfo, int index) {
+ private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs,
InlongStreamInfo streamInfo, int index,
+ QueryMessageRequest request) {
if (null == messageObjs) {
return null;
}
@@ -96,6 +99,9 @@ public class PbMsgDeserializeOperator implements
DeserializeOperator {
DataTypeOperator dataTypeOperator =
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
List<FieldInfo> streamFieldList =
dataTypeOperator.parseFields(body, streamInfo);
+ if (checkIfFilter(request, streamFieldList)) {
+ continue;
+ }
BriefMQMessage message = BriefMQMessage.builder()
.id(index)
.inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index db025d2c2f..d55c7d6cad 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -24,6 +24,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.datatype.DataTypeOperator;
import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
@@ -49,8 +50,8 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
}
@Override
- public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
- byte[] msgBytes, Map<String, String> headers, int index) {
+ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
List<BriefMQMessage> briefMQMessages,
+ byte[] msgBytes, Map<String, String> headers, int index,
QueryMessageRequest request) {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
@@ -59,6 +60,9 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
DataTypeOperator dataTypeOperator =
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
List<FieldInfo> fieldList = dataTypeOperator.parseFields(body,
streamInfo);
+ if (checkIfFilter(request, fieldList)) {
+ return briefMQMessages;
+ }
BriefMQMessage briefMQMessage = BriefMQMessage.builder()
.id(index)
.inlongGroupId(groupId)
@@ -69,7 +73,8 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
.body(body)
.fieldList(fieldList)
.build();
- return Collections.singletonList(briefMQMessage);
+ briefMQMessages.addAll(Collections.singletonList(briefMQMessage));
+ return briefMQMessages;
} catch (Exception e) {
String errMsg = String.format("decode msg failed for groupId=%s,
streamId=%s", groupId, streamId);
log.error(errMsg, e);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
index 4664f670a6..528884c996 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@@ -81,12 +82,12 @@ public interface QueueResourceOperator {
*
* @param groupInfo inlong group info
* @param streamInfo inlong stream info
- * @param messageCount count of messages to query
+ * @param request query message request
* @throws Exception any exception if occurred
* @return query brief mq message info
*/
default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo
groupInfo, InlongStreamInfo streamInfo,
- Integer messageCount) throws Exception {
+ QueryMessageRequest request) throws Exception {
return null;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 763a654c26..b49e52317a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -24,6 +24,7 @@ import
org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -113,19 +114,19 @@ public class KafkaOperator {
* Query topic message for the given Kafka cluster.
*/
public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo
clusterInfo, String topicName,
- String consumeGroup, Integer messageCount, InlongStreamInfo
streamInfo) {
+ String consumeGroup, InlongStreamInfo streamInfo,
QueryMessageRequest request) {
LOGGER.debug("begin to query message for topic {} in cluster: {}",
topicName, clusterInfo);
Properties properties = getProperties(clusterInfo.getUrl(),
consumeGroup);
KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(properties);
- return getLatestMessage(consumer, topicName, messageCount, streamInfo);
+ return getLatestMessage(consumer, topicName, streamInfo, request);
}
@VisibleForTesting
public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]>
consumer, String topicName,
- Integer messageCount, InlongStreamInfo streamInfo) {
+ InlongStreamInfo streamInfo, QueryMessageRequest request) {
List<BriefMQMessage> messageList = new ArrayList<>();
-
+ Integer messageCount = request.getMessageCount();
try {
List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topicName);
List<TopicPartition> topicPartitionList =
partitionInfoList.stream()
@@ -162,7 +163,7 @@ public class KafkaOperator {
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
}
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
- messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
record.value(), headers, index));
+ deserializeOperator.decodeMsg(streamInfo, messageList,
record.value(), headers, index, request);
if (messageList.size() >= messageCount) {
break;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
index cc0fa83dd2..6708bd8ecd 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -192,7 +193,7 @@ public class KafkaQueueResourceOperator implements
QueueResourceOperator {
@Override
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo,
- Integer messageCount) {
+ QueryMessageRequest request) {
ClusterInfo clusterInfo =
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
String topicName = streamInfo.getMqResource();
@@ -204,8 +205,8 @@ public class KafkaQueueResourceOperator implements
QueueResourceOperator {
String consumeGroup =
String.format(KAFKA_CONSUMER_GROUP_REALTIME_REVIEW,
groupInfo.getInlongClusterTag(), topicName);
- return kafkaOperator.queryLatestMessage((KafkaClusterInfo)
clusterInfo, topicName, consumeGroup, messageCount,
- streamInfo);
+ return kafkaOperator.queryLatestMessage((KafkaClusterInfo)
clusterInfo, topicName, consumeGroup, streamInfo,
+ request);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 5c9e9f5d10..e5e8d4c815 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -34,6 +34,7 @@ import
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -409,16 +410,16 @@ public class PulsarOperator {
* Query topic message for the given pulsar cluster.
*/
public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo
pulsarClusterInfo, String topicFullName,
- Integer messageCount, InlongStreamInfo streamInfo, boolean serial)
{
+ QueryMessageRequest request, InlongStreamInfo streamInfo, boolean
serial) {
LOGGER.info("begin to query message for topic {}", topicFullName);
List<BriefMQMessage> messageList = new ArrayList<>();
int partitionCount = getPartitionCount(pulsarClusterInfo,
topicFullName);
- for (int messageIndex = 0; messageIndex < messageCount;
messageIndex++) {
+ for (int messageIndex = 0; messageIndex < 100; messageIndex++) {
int currentPartitionNum = messageIndex % partitionCount;
int messagePosition = messageIndex / partitionCount + 1;
String topicNameOfPartition =
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
- messageList.addAll(queryMessageFromPulsar(topicNameOfPartition,
pulsarClusterInfo, messageIndex,
- streamInfo, messagePosition));
+ messageList.addAll(queryMessageFromPulsar(topicNameOfPartition,
pulsarClusterInfo, messageIndex, streamInfo,
+ messagePosition, request));
}
LOGGER.info("success query message for topic={}", topicFullName);
return messageList;
@@ -444,8 +445,7 @@ public class PulsarOperator {
* Query pulsar message.
*/
private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition,
PulsarClusterInfo pulsarClusterInfo,
- int index,
- InlongStreamInfo streamInfo, int messagePosition) {
+ int index, InlongStreamInfo streamInfo, int messagePosition,
QueryMessageRequest request) {
List<BriefMQMessage> briefMQMessages = new ArrayList<>();
try {
ResponseEntity<byte[]> httpResponse =
@@ -462,8 +462,7 @@ public class PulsarOperator {
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
}
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
- briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo,
messageInfo.getBody(),
- headers, index));
+ deserializeOperator.decodeMsg(streamInfo, briefMQMessages,
messageInfo.getBody(), headers, index, request);
} catch (Exception e) {
LOGGER.warn("query message from pulsar error for groupId = {},
streamId = {}",
streamInfo.getInlongGroupId(),
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 6d86760ea4..efbdeabf20 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -36,6 +36,7 @@ import
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -305,23 +306,23 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
/**
* Query latest message from pulsar
*/
- public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
- InlongStreamInfo streamInfo, Integer messageCount) throws
Exception {
+ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo,
+ QueryMessageRequest request) throws Exception {
List<ClusterInfo> pulsarClusterList =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
ClusterType.PULSAR);
List<BriefMQMessage> briefMQMessages =
Collections.synchronizedList(new ArrayList<>());
- QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount,
pulsarClusterList.size());
+ QueryCountDownLatch queryLatch = new
QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
for (ClusterInfo clusterInfo : pulsarClusterList) {
QueryLatestMessagesRunnable task = new
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
- (PulsarClusterInfo) clusterInfo, pulsarOperator,
messageCount, briefMQMessages, queryLatch);
+ (PulsarClusterInfo) clusterInfo, pulsarOperator, request,
briefMQMessages, queryLatch);
this.executor.execute(task);
}
queryLatch.await(30, TimeUnit.SECONDS);
log.info("success query pulsar message for groupId={}, streamId={}",
streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());
- int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+ int finalMsgCount = Math.min(request.getMessageCount(),
briefMQMessages.size());
if (finalMsgCount > 0) {
return briefMQMessages.subList(0, finalMsgCount);
} else {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index caf61e0ebe..4fb6b58e49 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -37,7 +38,7 @@ public class QueryLatestMessagesRunnable implements Runnable {
private InlongStreamInfo streamInfo;
private PulsarClusterInfo clusterInfo;
private PulsarOperator pulsarOperator;
- private Integer messageCount;
+ private QueryMessageRequest queryMessageRequest;
private List<BriefMQMessage> briefMQMessages;
private QueryCountDownLatch latch;
@@ -45,14 +46,14 @@ public class QueryLatestMessagesRunnable implements
Runnable {
InlongStreamInfo streamInfo,
PulsarClusterInfo clusterInfo,
PulsarOperator pulsarOperator,
- Integer messageCount,
+ QueryMessageRequest queryMessageRequest,
List<BriefMQMessage> briefMQMessages,
QueryCountDownLatch latch) {
this.inlongPulsarInfo = inlongPulsarInfo;
this.streamInfo = streamInfo;
this.clusterInfo = clusterInfo;
this.pulsarOperator = pulsarOperator;
- this.messageCount = messageCount;
+ this.queryMessageRequest = queryMessageRequest;
this.briefMQMessages = briefMQMessages;
this.latch = latch;
}
@@ -69,7 +70,7 @@ public class QueryLatestMessagesRunnable implements Runnable {
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
List<BriefMQMessage> messages =
- pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
messageCount, streamInfo, serial);
+ pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
queryMessageRequest, streamInfo, serial);
if (CollectionUtils.isNotEmpty(messages)) {
briefMQMessages.addAll(messages);
this.latch.countDown(messages.size());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index cdbc4da86d..050de07805 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -30,6 +30,7 @@ import
org.apache.inlong.manager.pojo.queue.tubemq.TubeHttpResponse;
import org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse;
import
org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse.TubeDataInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -269,7 +270,7 @@ public class TubeMQOperator {
* Query topic message for the given tubemq cluster.
*/
public List<BriefMQMessage> queryLastMessage(TubeClusterInfo tubeCluster,
String topicName,
- Integer msgCount, InlongStreamInfo streamInfo) {
+ InlongStreamInfo streamInfo, QueryMessageRequest request) {
LOGGER.info("begin to query message for topic {} in cluster: {}",
topicName, tubeCluster);
String masterUrl = tubeCluster.getMasterWebUrl();
TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
@@ -286,7 +287,8 @@ public class TubeMQOperator {
throw new BusinessException("TubeMQ master url or TubeMQ topic
cannot be null");
}
- String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH +
TOPIC_NAME + topicName + MSG_COUNT + msgCount;
+ String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH +
TOPIC_NAME + topicName + MSG_COUNT
+ + request.getMessageCount();
TubeMessageResponse response = HttpUtils.request(restTemplate,
url, HttpMethod.GET,
null, new HttpHeaders(), TubeMessageResponse.class);
if (response.getErrCode() != SUCCESS_CODE && response.getErrCode()
!= 200) {
@@ -310,7 +312,7 @@ public class TubeMQOperator {
}
byte[] messageData =
Base64.getDecoder().decode(tubeDataInfo.getData());
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
- messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
messageData, map, index));
+ deserializeOperator.decodeMsg(streamInfo, messageList,
messageData, map, index, request);
}
LOGGER.info("success query messages for topic={}", topicName);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
index f6cf034e67..14dbd4f202 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -132,14 +133,14 @@ public class TubeMQQueueResourceOperator implements
QueueResourceOperator {
}
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo,
- Integer messageCount) {
+ QueryMessageRequest request) {
Preconditions.expectNotNull(groupInfo, "inlong group info cannot be
null");
String clusterTag = groupInfo.getInlongClusterTag();
TubeClusterInfo tubeCluster = (TubeClusterInfo)
clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
String topicName = groupInfo.getMqResource();
- return tubeMQOperator.queryLastMessage(tubeCluster, topicName,
messageCount, streamInfo);
+ return tubeMQOperator.queryLastMessage(tubeCluster, topicName,
streamInfo, request);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 96002d0ebd..013a6546b5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
@@ -241,12 +242,10 @@ public interface InlongStreamService {
/**
* List brief mq message info
*
- * @param groupId inlong group id
- * @param streamId inlong stream id
- * @param messageCount Count of messages to query'
+ * @param request query message request
* @param operator operator
* @return list of brief mq message info
*/
- List<BriefMQMessage> listMessages(String groupId, String streamId, Integer
messageCount, String operator);
+ List<BriefMQMessage> listMessages(QueryMessageRequest request, String
operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 46db20a04f..93692aeb28 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -55,6 +55,7 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -948,15 +949,15 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
}
@Override
- public List<BriefMQMessage> listMessages(String groupId, String streamId,
Integer messageCount, String operator) {
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ public List<BriefMQMessage> listMessages(QueryMessageRequest request,
String operator) {
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getGroupId());
InlongGroupOperator instance =
groupOperatorFactory.getInstance(groupEntity.getMqType());
InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
- InlongStreamInfo inlongStreamInfo = get(groupId, streamId);
+ InlongStreamInfo inlongStreamInfo = get(request.getGroupId(),
request.getStreamId());
List<BriefMQMessage> messageList = new ArrayList<>();
QueueResourceOperator queueOperator =
queueOperatorFactory.getInstance(groupEntity.getMqType());
try {
- messageList = queueOperator.queryLatestMessages(groupInfo,
inlongStreamInfo, messageCount);
+ messageList = queueOperator.queryLatestMessages(groupInfo,
inlongStreamInfo, request);
} catch (Exception e) {
LOGGER.error("query message error ", e);
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
index acc6ebb3f5..d0b88d0ec7 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
import com.google.common.collect.Lists;
@@ -116,15 +117,18 @@ public class KafkaOperatorTest extends ServiceBaseTest {
@Test
void testGetKafkaLatestMessage() {
- List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ QueryMessageRequest request = new QueryMessageRequest();
+ request.setMessageCount(10);
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
Assertions.assertEquals(0, messages.size());
}
@Test
void testGetKafkaLatestMessage_1() {
addRecord(Collections.singletonList("inlong"));
-
- List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ QueryMessageRequest request = new QueryMessageRequest();
+ request.setMessageCount(10);
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
Assertions.assertEquals(1, messages.size());
Assertions.assertEquals("inlong", messages.get(0).getBody());
}
@@ -133,8 +137,9 @@ public class KafkaOperatorTest extends ServiceBaseTest {
void testGetKafkaLatestMessage_2() {
List<String> records = IntStream.range(0, 9).mapToObj(index -> "name_"
+ index).collect(Collectors.toList());
addRecord(records);
-
- List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ QueryMessageRequest request = new QueryMessageRequest();
+ request.setMessageCount(10);
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
Assertions.assertEquals(9, messages.size());
}
@@ -142,8 +147,9 @@ public class KafkaOperatorTest extends ServiceBaseTest {
void testGetKafkaLatestMessage_4() {
List<String> records = IntStream.range(0, 21).mapToObj(index ->
"name_" + index).collect(Collectors.toList());
addRecord(records);
-
- List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+ QueryMessageRequest request = new QueryMessageRequest();
+ request.setMessageCount(10);
+ List<BriefMQMessage> messages =
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
Assertions.assertEquals(10, messages.size());
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index ce2d9175c8..e5e0dd5343 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -255,15 +256,9 @@ public class InlongStreamController {
@RequestMapping(value = "/stream/listMessages", method = RequestMethod.GET)
@ApiOperation(value = "Get inlong stream message")
- @ApiImplicitParams({
- @ApiImplicitParam(name = "groupId", dataTypeClass = String.class,
required = true),
- @ApiImplicitParam(name = "streamId", dataTypeClass = String.class,
required = true),
- @ApiImplicitParam(name = "messageCount", dataTypeClass =
String.class, required = true)
- })
- public Response<List<BriefMQMessage>> listMessages(@RequestParam String
groupId, @RequestParam String streamId,
- @RequestParam Integer messageCount) {
+ public Response<List<BriefMQMessage>> listMessages(QueryMessageRequest
request) {
String username = LoginUserUtils.getLoginUser().getName();
- return Response.success(streamService.listMessages(groupId, streamId,
messageCount, username));
+ return Response.success(streamService.listMessages(request, username));
}
}