fuweng11 commented on code in PR #7474:
URL: https://github.com/apache/inlong/pull/7474#discussion_r1122742111


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -471,70 +471,68 @@ private DataConfig getDataConfig(StreamSourceEntity 
entity, int op) {
         dataConfig.setInlongStreamId(streamId);
 
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null) {
-            throw new BusinessException(String.format("inlong group not found 
for groupId=%s", groupId));
-        }
         InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
-        if (streamEntity == null) {
-            throw new BusinessException(
-                    String.format("inlong stream not found for groupId=%s 
streamId=%s", groupId, streamId));
-        }
-
-        String extParams = entity.getExtParams();
-        dataConfig.setSyncSend(streamEntity.getSyncSend());
-        if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
-            String dataSeparator = streamEntity.getDataSeparator();
-            extParams = (null != dataSeparator ? getExtParams(extParams, 
dataSeparator) : extParams);
-        }
-        dataConfig.setExtParams(extParams);
-
-        int dataReportType = groupEntity.getDataReportType();
-        dataConfig.setDataReportType(dataReportType);
-        if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
-            // add mq cluster setting
-            List<MQClusterInfo> mqSet = new ArrayList<>();
-            List<String> clusterTagList = 
Collections.singletonList(groupEntity.getInlongClusterTag());
-            List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, 
ClusterType.PULSAR);
-            ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                    .typeList(typeList)
-                    .clusterTagList(clusterTagList)
-                    .build();
-            List<InlongClusterEntity> mqClusterList = 
clusterMapper.selectByCondition(pageRequest);
-            for (InlongClusterEntity cluster : mqClusterList) {
-                MQClusterInfo clusterInfo = new MQClusterInfo();
-                clusterInfo.setUrl(cluster.getUrl());
-                clusterInfo.setToken(cluster.getToken());
-                clusterInfo.setMqType(cluster.getType());
-                
clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), 
HashMap.class));
-                mqSet.add(clusterInfo);
+        if (groupEntity != null && streamEntity != null) {
+            String extParams = entity.getExtParams();
+            dataConfig.setSyncSend(streamEntity.getSyncSend());
+            if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+                String dataSeparator = streamEntity.getDataSeparator();
+                extParams = (null != dataSeparator ? getExtParams(extParams, 
dataSeparator) : extParams);
             }
-            dataConfig.setMqClusters(mqSet);
-
-            // add topic setting
-            String mqResource = groupEntity.getMqResource();
-            String mqType = groupEntity.getMqType();
-            if (MQType.PULSAR.equals(mqType) || 
MQType.TDMQ_PULSAR.equals(mqType)) {
-                // first get the tenant from the InlongGroup, and then get it 
from the PulsarCluster.
-                InlongPulsarDTO pulsarDTO = 
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
-                String tenant = pulsarDTO.getTenant();
-                if (StringUtils.isBlank(tenant)) {
-                    // If there are multiple Pulsar clusters, take the first 
one.
-                    // Note that the tenants in multiple Pulsar clusters must 
be identical.
-                    PulsarClusterDTO pulsarCluster = 
PulsarClusterDTO.getFromJson(mqClusterList.get(0).getExtParams());
-                    tenant = pulsarCluster.getTenant();
+            dataConfig.setExtParams(extParams);
+
+            int dataReportType = groupEntity.getDataReportType();
+            dataConfig.setDataReportType(dataReportType);
+            if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+                // add mq cluster setting
+                List<MQClusterInfo> mqSet = new ArrayList<>();
+                List<String> clusterTagList = 
Collections.singletonList(groupEntity.getInlongClusterTag());
+                List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, 
ClusterType.PULSAR);
+                ClusterPageRequest pageRequest = ClusterPageRequest.builder()
+                        .typeList(typeList)
+                        .clusterTagList(clusterTagList)
+                        .build();
+                List<InlongClusterEntity> mqClusterList = 
clusterMapper.selectByCondition(pageRequest);
+                for (InlongClusterEntity cluster : mqClusterList) {
+                    MQClusterInfo clusterInfo = new MQClusterInfo();
+                    clusterInfo.setUrl(cluster.getUrl());
+                    clusterInfo.setToken(cluster.getToken());
+                    clusterInfo.setMqType(cluster.getType());
+                    
clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), 
HashMap.class));
+                    mqSet.add(clusterInfo);
                 }
-
-                String topic = 
String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
-                        tenant, mqResource, streamEntity.getMqResource());
-                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
-                topicConfig.setInlongGroupId(groupId + "/" + streamId);
-                topicConfig.setTopic(topic);
-                dataConfig.setTopicInfo(topicConfig);
-            } else if (MQType.TUBEMQ.equals(mqType)) {
-                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
-                topicConfig.setInlongGroupId(groupId);
-                topicConfig.setTopic(mqResource);
-                dataConfig.setTopicInfo(topicConfig);
+                dataConfig.setMqClusters(mqSet);
+
+                // add topic setting
+                String mqResource = groupEntity.getMqResource();
+                String mqType = groupEntity.getMqType();
+                if (MQType.PULSAR.equals(mqType) || 
MQType.TDMQ_PULSAR.equals(mqType)) {
+                    // first get the tenant from the InlongGroup, and then get 
it from the PulsarCluster.
+                    InlongPulsarDTO pulsarDTO = 
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+                    String tenant = pulsarDTO.getTenant();
+                    if (StringUtils.isBlank(tenant)) {
+                        // If there are multiple Pulsar clusters, take the 
first one.
+                        // Note that the tenants in multiple Pulsar clusters 
must be identical.
+                        PulsarClusterDTO pulsarCluster = 
PulsarClusterDTO.getFromJson(
+                                mqClusterList.get(0).getExtParams());
+                        tenant = pulsarCluster.getTenant();
+                    }
+
+                    String topic = 
String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+                            tenant, mqResource, streamEntity.getMqResource());
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId + "/" + streamId);
+                    topicConfig.setTopic(topic);
+                    dataConfig.setTopicInfo(topicConfig);
+                } else if (MQType.TUBEMQ.equals(mqType)) {
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId);
+                    topicConfig.setTopic(mqResource);
+                    dataConfig.setTopicInfo(topicConfig);
+                }
+            } else {
+                dataConfig.setSyncSend(0);

Review Comment:
   This set to 0 was unnecessary, I remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to