healchow commented on code in PR #4564:
URL: https://github.com/apache/incubator-inlong/pull/4564#discussion_r891348718


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java:
##########
@@ -156,19 +155,20 @@ private void createPulsarSubscription(PulsarAdmin 
pulsarAdmin, String subscripti
     /**
      * Create tube consumer group
      */
-    private void createTubeConsumerGroup(ConsumptionEntity consumption) {
-        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new 
AddTubeConsumeGroupRequest();
-        addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id 
needed?
-        addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
-        AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new 
AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
-        bean.setTopicName(consumption.getTopic());
-        bean.setGroupName(consumption.getConsumerGroup());
-        
addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
+    private void createTubeConsumerGroup(ConsumptionEntity entity, String 
operator) {
+        String groupId = entity.getInlongGroupId();
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        Preconditions.checkNotNull(groupEntity, "inlong group not found for 
groupId=" + groupId);
+        String mqResource = groupEntity.getMqResource();
+        Preconditions.checkNotNull(mqResource, "mq resource cannot empty for 
groupId=" + groupId);
 
+        String clusterTag = groupEntity.getInlongClusterTag();
+        InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, 
null, ClusterType.CLS_TUBE);
         try {
-            
tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+            tubeMQOperator.createConsumerGroup((TubeClusterInfo) clusterInfo, 
entity.getTopic(),
+                    entity.getConsumerGroup(), operator);
         } catch (Exception e) {
-            throw new WorkflowListenerException("failed to create tube 
consumer group: " + addTubeConsumeGroupRequest);
+            throw new WorkflowListenerException("failed to create tube 
consumer group: " + e.getMessage());

Review Comment:
   Good idea. Print the stack to the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to