wuYin opened a new issue #458:
URL: https://github.com/apache/pulsar-client-go/issues/458


   #### Expected behavior
   
   For the `MaxReconnectToBroker` configuration, from the literal meaning, 
producer retries to reconnect brokers more than `MaxReconnectToBroker` times, 
the producer should report error and close, or something else. but it did 
nothing and caused infinite sendTimeout failure, even after broker recoverd, it 
still keep failing...
   
   
![image](https://user-images.githubusercontent.com/24536920/106605500-ef0ee880-659b-11eb-9e3b-e5e00ea4617a.png)
   
    
   
   
   #### Actual behavior
   
   Infinite sendTimeout
   
   
![image](https://user-images.githubusercontent.com/24536920/106606197-ce935e00-659c-11eb-8dcf-31a234177718.png)
   
   
   #### Steps to reproduce
   - start broker
   - start producer
   - shutdown broker
   - producer reconnection exceed 3 times
   - infinite send failed for using a closed connection which never gonna be 
recover
   ```go
   func main() {
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
        })
        if err != nil {
                log.Fatal(err)
        }
   
        defer client.Close()
   
        max := uint(3)
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
                Topic:                "topic-1",
                SendTimeout:          3 * time.Second,
                MaxReconnectToBroker: &max,
        })
        if err != nil {
                log.Fatal(err)
        }
        defer producer.Close()
        ctx := context.TODO()
        for i := 0; i < 256; i++ {
                go func(id int) {
                        for i := 0; i < 1000; i++ {
                                if _, err := producer.Send(ctx, 
&pulsar.ProducerMessage{Payload: []byte("x")}); err != nil {
                                        log.Println(err)
                                }
                                time.Sleep(1 * time.Second)
                        }
                }(i)
        }
        time.Sleep(1 * time.Hour)
   }
   ```
   
   #### How to fix
   
   I think this configuration is unnecessary, there is no such option in 
java-client
   If part of the partitions is unavailable, partitioner is required to 
implement dynamic routing to ensure high availability, that is more complicated


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to