archmangler commented on issue #15045:
URL: https://github.com/apache/pulsar/issues/15045#issuecomment-1090407287

   @tjiuming  I can reproduce the issue at will by ctrl-c of the client after a 
read, it's as if the client does not terminate properly , leaving the broker in 
an inconsistent state:
   
   See demonstration below:
   
   - Run script:
   ```
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   2022/04/06 15:23:41.601 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:23:41.607 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:59504 -> 192.168.6.203:6650] Connected to broker
   2022/04/06 15:23:41.612 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-ac05b14cf9, 0] Getting 
connection from pool
   2022/04/06 15:23:41.616 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:23:41.618 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:50092 -> 192.168.5.242:6650] Connected to broker
   2022/04/06 15:23:41.628 c_client.go:68: [info] INFO  | ConsumerImpl:220 | 
[persistent://ragnarok/transactions/requests, reader-ac05b14cf9, 0] Created 
consumer on broker [192.168.4.54:50092 -> 192.168.5.242:6650] 
   1  ->  (162,0,-1,0)  ->  [{redacted"}]
   2  ->  
   .
   .
   .
   16  ->  (162,15,-1,0)  ->  [{redacted]
   
   reached stream end. breaking out.
   
   ```
   
   - Rerun script:
   
   ```
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   2022/04/06 15:25:31.951 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:25:31.969 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:50590 -> 192.168.5.242:6650] Connected to broker
   2022/04/06 15:25:31.973 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-9316fe9b08, 0] Getting 
connection from pool
   2022/04/06 15:25:31.974 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:25:31.975 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:50592 -> 192.168.5.242:6650] Connected to broker
   2022/04/06 15:25:31.979 c_client.go:68: [info] WARN  | ClientConnection:971 
| [192.168.4.54:50592 -> 192.168.5.242:6650] Received error response from 
server: BrokerPersistenceError -- req_id: 0
   2022/04/06 15:25:31.979 c_client.go:68: [info] ERROR | ConsumerImpl:263 | 
[persistent://ragnarok/transactions/requests, reader-9316fe9b08, 0] Failed to 
create consumer: BrokerPersistenceError
   2022/04/06 15:25:31 Could not create reader: Failed to create Reader: 
BrokerPersistenceError
   ```
   
   - Restart brokers. Then run  again (NOTE: I always see `Failed to close 
consumer: ConnectError`) : on the first rurn
   
   ```
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   
   2022/04/06 15:27:07.893 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:27:07.899 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:45236 -> 192.168.4.194:6650] Connected to broker
   2022/04/06 15:27:07.904 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-6b2a574e90, 0] Getting 
connection from pool
   2022/04/06 15:27:07.908 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:27:07.910 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:46276 -> 192.168.2.18:6650] Connected to broker
   2022/04/06 15:27:07.923 c_client.go:68: [info] INFO  | ConsumerImpl:220 | 
[persistent://ragnarok/transactions/requests, reader-6b2a574e90, 0] Created 
consumer on broker [192.168.4.54:46276 -> 192.168.2.18:6650] 
   1  ->  (162,0,-1,0)  ->  [{redacted}]
   2  ->  (162,1,-1,0)  ->  
   
   .
   .
   
   16  ->  (162,15,-1,0)  ->  [{redacted}]
   
   reached stream end. breaking out.
   
   2022/04/06 15:27:08.099 c_client.go:68: [info] INFO  | ConsumerImpl:871 | 
[persistent://ragnarok/transactions/requests, reader-6b2a574e90, 0] Closing 
consumer for topic persistent://ragnarok/transactions/requests
   2022/04/06 15:27:08.131 c_client.go:68: [info] ERROR | ClientConnection:523 
| [192.168.4.54:46276 -> 192.168.2.18:6650] Read failed: End of file
   2022/04/06 15:27:08.131 c_client.go:68: [info] INFO  | ClientConnection:1436 
| [192.168.4.54:46276 -> 192.168.2.18:6650] Connection closed
   2022/04/06 15:27:08.131 c_client.go:68: [info] ERROR | ConsumerImpl:929 | 
[persistent://ragnarok/transactions/requests, reader-6b2a574e90, 0] Failed to 
close consumer: ConnectError
   root@pulsar-toolset-0:/pulsar/development# 
   root@pulsar-toolset-0:/pulsar/development# 
   
   ```
   
   Run  multiples more:
   
   
   ```
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   2022/04/06 15:29:09.238 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:09.243 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:45784 -> 192.168.4.194:6650] Connected to broker
   2022/04/06 15:29:09.247 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-1c1ee3b179, 0] Getting 
connection from pool
   2022/04/06 15:29:09.248 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:09.249 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:45786 -> 192.168.4.194:6650] Connected to broker
   2022/04/06 15:29:09.256 c_client.go:68: [info] WARN  | ClientConnection:971 
| [192.168.4.54:45786 -> 192.168.4.194:6650] Received error response from 
server: BrokerPersistenceError -- req_id: 0
   2022/04/06 15:29:09.256 c_client.go:68: [info] ERROR | ConsumerImpl:263 | 
[persistent://ragnarok/transactions/requests, reader-1c1ee3b179, 0] Failed to 
create consumer: BrokerPersistenceError
   2022/04/06 15:29:09 Could not create reader: Failed to create Reader: 
BrokerPersistenceError
   root@pulsar-toolset-0:/pulsar/development# 
   root@pulsar-toolset-0:/pulsar/development# 
   
   
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   2022/04/06 15:29:10.500 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:10.516 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:46718 -> 192.168.2.70:6650] Connected to broker
   2022/04/06 15:29:10.526 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-925c2cbe73, 0] Getting 
connection from pool
   2022/04/06 15:29:10.530 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:10.532 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:45790 -> 192.168.4.194:6650] Connected to broker
   2022/04/06 15:29:10.535 c_client.go:68: [info] WARN  | ClientConnection:971 
| [192.168.4.54:45790 -> 192.168.4.194:6650] Received error response from 
server: BrokerPersistenceError -- req_id: 0
   2022/04/06 15:29:10.535 c_client.go:68: [info] ERROR | ConsumerImpl:263 | 
[persistent://ragnarok/transactions/requests, reader-925c2cbe73, 0] Failed to 
create consumer: BrokerPersistenceError
   2022/04/06 15:29:10 Could not create reader: Failed to create Reader: 
BrokerPersistenceError
   root@pulsar-toolset-0:/pulsar/development# 
   root@pulsar-toolset-0:/pulsar/development# 
   
   
   root@pulsar-toolset-0:/pulsar/development# ./pulse 
   2022/04/06 15:29:11.987 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for pulsar://pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:11.993 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:35980 -> 192.168.6.87:6650] Connected to broker
   2022/04/06 15:29:11.999 c_client.go:68: [info] INFO  | HandlerBase:54 | 
[persistent://ragnarok/transactions/requests, reader-6d58cde4d9, 0] Getting 
connection from pool
   2022/04/06 15:29:12.000 c_client.go:68: [info] INFO  | ConnectionPool:84 | 
Created connection for 
pulsar://pulsar-broker-2.pulsar-broker.pulsar.svc.cluster.local:6650
   2022/04/06 15:29:12.002 c_client.go:68: [info] INFO  | ClientConnection:364 
| [192.168.4.54:45794 -> 192.168.4.194:6650] Connected to broker
   2022/04/06 15:29:12.005 c_client.go:68: [info] WARN  | ClientConnection:971 
| [192.168.4.54:45794 -> 192.168.4.194:6650] Received error response from 
server: BrokerPersistenceError -- req_id: 0
   2022/04/06 15:29:12.005 c_client.go:68: [info] ERROR | ConsumerImpl:263 | 
[persistent://ragnarok/transactions/requests, reader-6d58cde4d9, 0] Failed to 
create consumer: BrokerPersistenceError
   2022/04/06 15:29:12 Could not create reader: Failed to create Reader: 
BrokerPersistenceError
   root@pulsar-toolset-0:/pulsar/development# 
   root@pulsar-toolset-0:/pulsar/development# 
   
   ```
   
   
   Restarting the brokers allows us to connect once again.
   
   The test code:
   
   
   ```
   package main
   
   import (
           "context"
           "fmt"
           "log"
   
           "github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   
           //end of read stream
           stop := 16
   
           // Instantiate a Pulsar client
           client, err := pulsar.NewClient(pulsar.ClientOptions{
                   URL: "pulsar://pulsar-broker.pulsar.svc.cluster.local:6650",
           })
   
           if err != nil {
                   log.Fatalf("Could not create client: %v", err)
           }
   
           // Use the client to instantiate a reader
           reader, err := client.CreateReader(pulsar.ReaderOptions{
                   Topic:          "ragnarok/transactions/requests",
                   StartMessageID: pulsar.EarliestMessage,
           })
   
           if err != nil {
                   log.Fatalf("Could not create reader: %v", err)
           }
   
           ctx := context.Background()
   
           // Listen on the topic for incoming messages
           cnt := 0
   
           for {
   
                   msg, err := reader.Next(ctx)
   
                   if err != nil {
                           log.Fatalf("Error reading from topic: %v", err)
                   } else {
                           cnt++
                           content := string(msg.Payload())
   
                           fmt.Println(cnt, " -> ", msg.ID(), " -> ", content)
   
                           if cnt == stop {
                                   fmt.Println("reached stream end. breaking 
out.")
                                   defer reader.Close()
                                   break
                           }
   
                   }
           }
   }
   
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to