flowchartsman opened a new issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452


   I am creating a reader to read backwards in time off of a topic, and I have 
noticed that I receive the above error if I seek by time after creating the 
reader with `StartMessageID: pulsar.LatestMessageID()` if I use 
`pulsar.EarliestMessageID()` it seems to behave correctly:
   
   ```
   ../countconsumer localhost:6651 testten testnam testtopic
   new client
   persistent://testten/testnam/testtopic
   new reader
   INFO[0000] [Connecting to broker]                        
remote_addr="pulsar://localhost:6651"
   INFO[0000] [TCP connection established]                  
local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connection is ready]                         
local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connecting to broker]                        
remote_addr="pulsar://localhost:6651"
   INFO[0000] [TCP connection established]                  
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connection is ready]                         
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Connected consumer]                          consumerID=1 name= 
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   INFO[0000] [Created consumer]                            consumerID=1 name= 
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   new seek
   INFO[0000] Broker notification of Closed consumer: [1]   
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
   INFO[0000] [Reconnecting to broker in  100ms]            consumerID=1 name= 
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   ERRO[0000] [Failed to get last message id]               consumerID=1 
error="server error: MetadataError: Consumer not found" name= 
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
   ERRO[0000] [Failed to get last message id from broker]   error="server 
error: MetadataError: Consumer not found" 
topic="persistent://testten/testnam/testtopic"
   ```
   
   Code attached.
   
   ```go
   package main
   
   import (
        "context"
        "fmt"
        "log"
        "os"
        "os/signal"
        "strings"
        "time"
   
        "github.com/dustin/go-humanize"
        "go.uber.org/atomic"
   
        "github.com/apache/pulsar-client-go/pulsar"
   )
   
   // TopicName returns a fully-qualified pulsar topic string
   func TopicName(tenant, namespace, topic string) string {
        return fmt.Sprintf("persistent://%s/%s/%s", tenant, namespace, topic)
   }
   
   func main() {
        log.SetFlags(0)
   
        log.Println("new client")
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://" + os.Args[1],
        })
        if err != nil {
                log.Fatal(err)
        }
        defer client.Close()
   
        topic := TopicName(os.Args[2], os.Args[3], os.Args[4])
        log.Println(topic)
   
        log.Println("new reader")
        reader, err := client.CreateReader(pulsar.ReaderOptions{
                Topic: topic,
                StartMessageID:          pulsar.LatestMessageID(),
        })
        if err != nil {
                log.Fatal(err)
        }
        log.Println("new seek")
        time.Sleep(100 * time.Millisecond)
        reader.SeekByTime(time.Now().Add(-72 * time.Hour))
        time.Sleep(100 * time.Millisecond)
   
        ctx, cFunc := context.WithCancel(context.Background())
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, os.Interrupt, os.Kill)
        go func() {
                <-sig
                cFunc()
        }()
   
        var maxPayloadSize int
        var count atomic.Uint32
        go func() {
                t := time.NewTicker(3 * time.Second)
                for {
                        <-t.C
                        log.Printf("entries: %d\n", count.Load())
                }
        }()
   MainLoop:
        for {
                select {
                case <-ctx.Done():
                        log.Println("closing gracefully")
                        reader.Close()
                        break MainLoop
                default:
                        // get message
                }
                if reader.HasNext() {
                        msg, err := reader.Next(ctx)
                        if err != nil {
                                log.Println("consumer receive error:", err)
                                continue
                        }
                        count.Add(1)
   
                        if len(msg.Payload()) > maxPayloadSize {
                                maxPayloadSize = len(msg.Payload())
                                h := humanize.Bytes(uint64(maxPayloadSize))
                                log.Printf("new max payload size: %d (%s)\n", 
maxPayloadSize, h)
                                filename := 
strings.ReplaceAll(fmt.Sprintf("%s-%s", os.Args[4], h), " ", "")
                                out, _ := os.Create(filename)
                                out.Write(msg.Payload())
                                if err := out.Close(); err != nil {
                                        log.Fatal(err)
                                }
                        }
   
                } else {
                        log.Println("done!")
                        break MainLoop
                }
        }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to