flowchartsman opened a new issue #461:
URL: https://github.com/apache/pulsar-client-go/issues/461
I have a producer set up like the following:
```go
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "tn/ns/topic",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
ctx, cFunc := context.WithCancel(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
go func() {
<-sig
cFunc()
}()
MainLoop:
for {
select {
case <-ctx.Done():
break MainLoop
case <-ticker.C:
// continue on
}
a := Article{
//fields here
}
b, _ := json.Marshal(a)
_, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: b,
})
if err != nil {
log.Println("error producing message:", err)
}
```
If the broker goes away or I spin down the (standalone local) cluster, the
producer enters a loop like the following and becomes unresponsive to sigint.
It must be killed:
```
INFO[29795] [Connection closed]
remote_addr="pulsar://localhost:6650"
INFO[29803] [Connecting to broker]
remote_addr="pulsar://localhost:6650"
WARN[29803] [Failed to connect to broker.] error="dial tcp
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
INFO[29803] [Connection closed]
remote_addr="pulsar://localhost:6650"
INFO[29819] [Connecting to broker]
remote_addr="pulsar://localhost:6650"
WARN[29819] [Failed to connect to broker.] error="dial tcp
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
INFO[29819] [Connection closed]
remote_addr="pulsar://localhost:6650"
WARN[29819] [Failed to lookup topic] error="connection
error" producerID=1 producer_name=standalone-0-0
topic="persistent://tn/ns/topic"
INFO[29819] [Reconnecting to broker in 1m0s] producerID=1
producer_name=standalone-0-0 topic="persistent://tn/ns/topic"
INFO[29820] [Connecting to broker]
remote_addr="pulsar://localhost:6650"
WARN[29820] [Failed to connect to broker.] error="dial tcp
[::1]:6650: connect: connection refused" remote_addr="pulsar://localhost:6650"
```
This means it gets into a state where the send is ignoring the context. This
_may_ be occurring in **producer_partition.go**, but I haven't had the time to
resear it yet.
```go
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: callback,
flushImmediately: flushImmediately,
publishTime: time.Now(),
}
p.options.Interceptors.BeforeSend(p, msg)
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
if callback != nil {
callback(nil, msg, errSendQueueIsFull)
}
return
}
} else {
p.publishSemaphore.Acquire()
}
p.metrics.MessagesPending.Inc()
p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
p.eventsChan <- sr // <- Here? Select clause with ctx.Done?
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]