flowchartsman opened a new issue #461:
URL: https://github.com/apache/pulsar-client-go/issues/461


   I have a producer set up like the following:
   
   ```go
           client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
        })
        if err != nil {
                log.Fatal(err)
        }
   
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
                Topic: "tn/ns/topic",
        })
        if err != nil {
                log.Fatal(err)
        }
   
        defer client.Close()
   
        ticker := time.NewTicker(300 * time.Millisecond)
        defer ticker.Stop()
   
        ctx, cFunc := context.WithCancel(context.Background())
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, os.Interrupt, os.Kill)
        go func() {
                <-sig
                cFunc()
        }()
   
   MainLoop:
        for {
                select {
                case <-ctx.Done():
                        break MainLoop
                case <-ticker.C:
                        // continue on
                }
                a := Article{
                        //fields here
                }
                    b, _ := json.Marshal(a)
                 _, err := producer.Send(ctx, &pulsar.ProducerMessage{
                        Payload: b,
                })
                if err != nil {
                        log.Println("error producing message:", err)
                }
   ```
   
   If the broker goes away or I spin down the (standalone local) cluster, the 
producer enters a loop like the following and becomes unresponsive to sigint. 
It must be killed:
   
   ```
   INFO[29795] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   INFO[29803] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[29803] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[29803] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   INFO[29819] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[29819] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   INFO[29819] [Connection closed]                           
remote_addr="pulsar://localhost:6650"
   WARN[29819] [Failed to lookup topic]                      error="connection 
error" producerID=1 producer_name=standalone-0-0 
topic="persistent://tn/ns/topic"
   INFO[29819] [Reconnecting to broker in  1m0s]             producerID=1 
producer_name=standalone-0-0 topic="persistent://tn/ns/topic"
   INFO[29820] [Connecting to broker]                        
remote_addr="pulsar://localhost:6650"
   WARN[29820] [Failed to connect to broker.]                error="dial tcp 
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
   ```
   
   This means it gets into a state where the send is ignoring the context. This 
_may_ be occurring in **producer_partition.go**, but I haven't had the time to 
resear it yet.
   
   ```go
   func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
                callback:         callback,
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
        }
        p.options.Interceptors.BeforeSend(p, msg)
   
        if p.options.DisableBlockIfQueueFull {
                if !p.publishSemaphore.TryAcquire() {
                        if callback != nil {
                                callback(nil, msg, errSendQueueIsFull)
                        }
                        return
                }
        } else {
                p.publishSemaphore.Acquire()
        }
   
        p.metrics.MessagesPending.Inc()
        p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
   
        p.eventsChan <- sr  // <- Here? Select clause with ctx.Done?
   }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to