flowchartsman opened a new issue #432:
URL: https://github.com/apache/pulsar-client-go/issues/432
#### Steps to reproduce
Initialize a standalone cluster and create some entities:
```
$ pulsarctl tenant create -c standalone testtenant
$ pulsarctl namespace create testtenant/logs
```
Create a client with the following code (I have it in a directory called
`rexconsumer`):
```go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
// TopicWildcardName returns a fully-qualified pulsar topic string
func TopicWildcardName(tenant, namespace string) string {
return fmt.Sprintf("persistent://%s/%s/*", tenant, namespace)
}
func main() {
log.SetFlags(0)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topicpattern := TopicWildcardName(os.Args[1], os.Args[2])
log.Println(topicpattern)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
TopicsPattern: topicpattern,
SubscriptionName: "regexconsumer",
Type: pulsar.Exclusive,
SubscriptionInitialPosition:
pulsar.SubscriptionPositionEarliest,
AutoDiscoveryPeriod: time.Second,
})
ctx, cFunc := context.WithCancel(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
go func() {
<-sig
cFunc()
}()
MainLoop:
for {
select {
case <-ctx.Done():
log.Println("closing gracefully")
consumer.Close()
break MainLoop
default:
// get message
}
msg, err := consumer.Receive(ctx)
if err != nil {
log.Println("consumer receive error:", err)
continue
}
log.Printf("Payload:\n%s",string(msg.Payload()))
consumer.Ack(msg)
}
}
```
start the consumer:
```
$ ./rexconsumer testtenant logs
persistent://testtenant/logs/*
INFO[0000] [Connecting to broker]
remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established]
local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready]
```
Create some topics and verify they are there:
```
$ pulsarctl topics create testtenant/logs/topic1 0
Create topic persistent://testtenant/logs/topic1 with 0 partitions
successfully
$ pulsarctl topics create testtenant/logs/topic2 0
Create topic persistent://testtenant/logs/topic2 with 0 partitions
successfully
$ pulsarctl topics list testtenant/logs
+-------------------------------------+---------------+
| TOPIC NAME | PARTITIONED ? |
+-------------------------------------+---------------+
| persistent://testtenant/logs/topic1 | N |
| persistent://testtenant/logs/topic2 | N |
+-------------------------------------+---------------+
```
Now force-delete one of the topics:
```
pulsarctl topics delete -f -n persistent://testtenant/logs/topic1
Delete topic persistent://testtenant/logs/topic1 successfully
```
In the consumer you will see:
```
INFO[0430] Broker notification of Closed consumer: [1]
local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
INFO[0430] [Reconnecting to broker in 100ms] consumerID=1
name=wfidv subscription=regexconsumer
topic="persistent://testtenant/logs/topic1"
INFO[0430] [Connected consumer] consumerID=1
name=wfidv subscription=regexconsumer
topic="persistent://testtenant/logs/topic1"
INFO[0430] [Reconnected consumer to broker] consumerID=1
name=wfidv subscription=regexconsumer
topic="persistent://testtenant/logs/topic1"
```
And the topic is still there:
```
$ pulsarctl topics list testtenant/logs
+-------------------------------------+---------------+
| TOPIC NAME | PARTITIONED ? |
+-------------------------------------+---------------+
| persistent://testtenant/logs/topic1 | N |
| persistent://testtenant/logs/topic2 | N |
+-------------------------------------+---------------+
```
#### Expected behavior
My understanding from #226 is that regex/pattern consumers should be
applying this flag so that they will not do this. The topic should not be
re-created.
#### Actual behavior
As above. Topic is re-created.
#### System configuration
**Pulsar version**: 2.8.0-SNAPSHOT (head)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]