flowchartsman opened a new issue #432:
URL: https://github.com/apache/pulsar-client-go/issues/432


   #### Steps to reproduce
   
   Initialize a standalone cluster and create some entities:
   
   ```
   $ pulsarctl tenant create -c standalone testtenant
   $ pulsarctl namespace create testtenant/logs
   
   ```
   
   Create a client with the following code (I have it in a directory called 
`rexconsumer`):
   ```go
   package main
   import (
        "context"
        "encoding/json"
        "fmt"
        "log"
        "os"
        "os/signal"
        "time"
        "github.com/apache/pulsar-client-go/pulsar"
   )
   // TopicWildcardName returns a fully-qualified pulsar topic string
   func TopicWildcardName(tenant, namespace string) string {
        return fmt.Sprintf("persistent://%s/%s/*", tenant, namespace)
   }
   func main() {
        log.SetFlags(0)
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
        })
        if err != nil {
                log.Fatal(err)
        }
        defer client.Close()
        topicpattern := TopicWildcardName(os.Args[1], os.Args[2])
        log.Println(topicpattern)
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                TopicsPattern:               topicpattern,
                SubscriptionName:            "regexconsumer",
                Type:                        pulsar.Exclusive,
                SubscriptionInitialPosition: 
pulsar.SubscriptionPositionEarliest,
                AutoDiscoveryPeriod:         time.Second,
        })
        ctx, cFunc := context.WithCancel(context.Background())
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, os.Interrupt, os.Kill)
        go func() {
                <-sig
                cFunc()
        }()
   MainLoop:
        for {
                select {
                case <-ctx.Done():
                        log.Println("closing gracefully")
                        consumer.Close()
                        break MainLoop
                default:
                        // get message
                }
                msg, err := consumer.Receive(ctx)
                if err != nil {
                        log.Println("consumer receive error:", err)
                        continue
                }
                   log.Printf("Payload:\n%s",string(msg.Payload()))
                consumer.Ack(msg)
        }
   }
   ```
   
   start the consumer:
   ```
   $ ./rexconsumer testtenant logs
   persistent://testtenant/logs/*
   INFO[0000] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   INFO[0000] [TCP connection established]                  
local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
   INFO[0000] [Connection is ready]
   ```
   
   Create some topics and verify they are there:
   
   ```
   $ pulsarctl topics create testtenant/logs/topic1 0
   Create topic persistent://testtenant/logs/topic1 with 0 partitions 
successfully
   $ pulsarctl topics create testtenant/logs/topic2 0
   Create topic persistent://testtenant/logs/topic2 with 0 partitions 
successfully
   $ pulsarctl topics list testtenant/logs
   +-------------------------------------+---------------+
   |             TOPIC NAME              | PARTITIONED ? |
   +-------------------------------------+---------------+
   | persistent://testtenant/logs/topic1 | N             |
   | persistent://testtenant/logs/topic2 | N             |
   +-------------------------------------+---------------+
   ```
   
   Now force-delete one of the topics:
   
   ```
   pulsarctl topics delete -f -n persistent://testtenant/logs/topic1 
   Delete topic persistent://testtenant/logs/topic1 successfully
   ```
   
   In the consumer you will see:
   ```
   INFO[0430] Broker notification of Closed consumer: [1]   
local_addr="[::1]:63062" remote_addr="pulsar://localhost:6650"
   INFO[0430] [Reconnecting to broker in  100ms]            consumerID=1 
name=wfidv subscription=regexconsumer 
topic="persistent://testtenant/logs/topic1"
   INFO[0430] [Connected consumer]                          consumerID=1 
name=wfidv subscription=regexconsumer 
topic="persistent://testtenant/logs/topic1"
   INFO[0430] [Reconnected consumer to broker]              consumerID=1 
name=wfidv subscription=regexconsumer 
topic="persistent://testtenant/logs/topic1"
   ```
   
   And the topic is still there:
   ```
   $ pulsarctl topics list testtenant/logs
   +-------------------------------------+---------------+
   |             TOPIC NAME              | PARTITIONED ? |
   +-------------------------------------+---------------+
   | persistent://testtenant/logs/topic1 | N             |
   | persistent://testtenant/logs/topic2 | N             |
   +-------------------------------------+---------------+
   
   ```
   
   #### Expected behavior
   
   My understanding from #226 is that regex/pattern consumers should be 
applying this flag so that they will not do this. The topic should not be 
re-created.
   
   #### Actual behavior
   
   As above. Topic is re-created.
   
   #### System configuration
   
   **Pulsar version**: 2.8.0-SNAPSHOT (head)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to