jujiale opened a new issue, #1362:
URL: https://github.com/apache/pulsar-client-go/issues/1362
#### Expected behavior
consumer under the same namespace are not blocked
#### Actual behavior
consumer under the same namespace are all blocked
#### Steps to reproduce
1.create several pulsar topics, they under same namespace, and all topic
have consumers. send msg to these topics continuously.
2.create some consumer, this consumer contains **retry** and **dlq** queue,
the code as below.
`
c, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic:
common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name,
target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
SubscriptionName:
common.GetEventbridgeTargetSubscriptionNameV2(target.Name, target.ID),,
Type: pulsar.Shared,
RetryEnable: true,
NackBackoffPolicy: ExponentialNackBackoffRetryPolicy{},
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: 20,
DeadLetterTopic:
common.GetEventbridgeTargetDLQTopicNameV2(
common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name,
target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
target.Name,
target.ID,
),
RetryLetterTopic:
common.GetEventbridgeTargetRetryTopicNameV2(
common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name,
target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
target.Name,
target.ID,
),
InitSubscriptionName: "default",
},
})
`
**common** package as below, very simple logic
`
func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId
int64, ruleName string, ruleId int64) string {
return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d",
eventbridgeName, eventbridgeId, ruleName, ruleId)
}
func GetEventbridgeTargetSubscriptionNameV2(targetName string, targetId
int64) string {
return fmt.Sprintf("target-%s-%d", targetName, targetId)
}
func GetEventbridgeTargetDLQTopicNameV2(topic string, targetName string,
targetId int64) string {
return fmt.Sprintf("%s-target-%s-%d-DLQ", topic, targetName, targetId)
}
func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId
int64, ruleName string, ruleId int64) string {
return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d",
eventbridgeName, eventbridgeId, ruleName, ruleId)
}
func GetEventbridgeTargetRetryTopicNameV2(topic string, targetName
string, targetId int64) string {
return fmt.Sprintf("%s-target-%s-%d-RETRY", topic, targetName, targetId)
}
`
3. after consumer reviced msg, we assume handle msg occurs error, invoke
**unAck()** message directly.
4. the producer sustain send message to topic, the message flow is very
large, after several time, the consumer could not consumer any message, also
under the same namespace, other topic's consumer could nerver consume any msg
either(other topic msg flow is very small).
we use pulsar-client-go 0.12.0, find above problem exist, but when we use
0.15.0, the problem is disappear
so I want to know why 0.12.0 have this problem, if there is some issue that
linked this problem.
the pulsar conf below, we set the unacked msg all are 0

#### System configuration
**Pulsar Server version**: both 2.8.1 and 3.0.6 have this problem
**Pulsar Go Client version**: 0.12.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]