healchow commented on code in PR #5837:
URL: https://github.com/apache/inlong/pull/5837#discussion_r966556356
##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java:
##########
@@ -90,4 +90,7 @@ public class SourceRequest {
@ApiModelProperty("Other properties if needed")
private Map<String, Object> properties = new LinkedHashMap<>();
+ @ApiModelProperty("Sub source information of existing agents")
Review Comment:
Add `JsonIgnore` annotation.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java:
##########
@@ -78,13 +78,11 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
@Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String
operator) {
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
- if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
- if (request.getSourceType().equals(SourceType.AUTO_PUSH)) {
- // auto push task needs not be issued to agent
- entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
- } else {
- entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- }
+ if (request.getSourceType().equals(SourceType.AUTO_PUSH)) {
Review Comment:
Suggested changing to `SourceType.AUTO_PUSH.equals(request.getSourceType())`.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java:
##########
@@ -99,15 +99,11 @@ protected void operateStreamSources(String groupId, String
streamId, String oper
*/
@SneakyThrows
public boolean checkIfOp(StreamSource streamSource, List<StreamSource>
unOperatedSources) {
- // if a source has sub-sources, it is considered a template source.
- // template sources do not need to be operated, its sub-sources will
be processed in this method later.
- if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
- return false;
- }
for (int retry = 0; retry < 60; retry++) {
int status = streamSource.getStatus();
SourceStatus sourceStatus = SourceStatus.forCode(status);
- if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus ==
SourceStatus.SOURCE_FROZEN) {
Review Comment:
Suggested adding a comment for the sub-sources need to be operate in some
place.
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -215,15 +216,15 @@ private List<DataConfig> fetchFileTasks(TaskRequest
taskRequest) {
Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) ||
StringUtils.isNotBlank(agentClusterName),
"both agent ip and cluster name are blank when fetching file
task");
List<StreamSourceEntity> sourceEntities =
sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
- Lists.newArrayList(SourceType.FILE), agentIp,
agentClusterName,TASK_FETCH_SIZE * 10);
+ Lists.newArrayList(SourceType.FILE), agentIp,
agentClusterName,UNLIMITED_FETCH_SIZE);
Review Comment:
Suggested removing the limit in SQL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]