flowchartsman opened a new issue #452:
URL: https://github.com/apache/pulsar-client-go/issues/452
I am creating a reader to read backwards in time off of a topic, and I have
noticed that I receive the above error if I seek by time after creating the
reader with `StartMessageID: pulsar.LatestMessageID()` if I use
`pulsar.EarliestMessageID()` it seems to behave correctly:
```
../countconsumer localhost:6651 testten testnam testtopic
new client
persistent://testten/testnam/testtopic
new reader
INFO[0000] [Connecting to broker]
remote_addr="pulsar://localhost:6651"
INFO[0000] [TCP connection established]
local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connection is ready]
local_addr="127.0.0.1:36588" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connecting to broker]
remote_addr="pulsar://localhost:6651"
INFO[0000] [TCP connection established]
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connection is ready]
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Connected consumer] consumerID=1 name=
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
INFO[0000] [Created consumer] consumerID=1 name=
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
new seek
INFO[0000] Broker notification of Closed consumer: [1]
local_addr="127.0.0.1:36596" remote_addr="pulsar://localhost:6651"
INFO[0000] [Reconnecting to broker in 100ms] consumerID=1 name=
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
ERRO[0000] [Failed to get last message id] consumerID=1
error="server error: MetadataError: Consumer not found" name=
subscription=reader-ruqtj topic="persistent://testten/testnam/testtopic"
ERRO[0000] [Failed to get last message id from broker] error="server
error: MetadataError: Consumer not found"
topic="persistent://testten/testnam/testtopic"
```
Code attached.
```go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/dustin/go-humanize"
"go.uber.org/atomic"
"github.com/apache/pulsar-client-go/pulsar"
)
// TopicName returns a fully-qualified pulsar topic string
func TopicName(tenant, namespace, topic string) string {
return fmt.Sprintf("persistent://%s/%s/%s", tenant, namespace, topic)
}
func main() {
log.SetFlags(0)
log.Println("new client")
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://" + os.Args[1],
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := TopicName(os.Args[2], os.Args[3], os.Args[4])
log.Println(topic)
log.Println("new reader")
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: topic,
StartMessageID: pulsar.LatestMessageID(),
})
if err != nil {
log.Fatal(err)
}
log.Println("new seek")
time.Sleep(100 * time.Millisecond)
reader.SeekByTime(time.Now().Add(-72 * time.Hour))
time.Sleep(100 * time.Millisecond)
ctx, cFunc := context.WithCancel(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, os.Kill)
go func() {
<-sig
cFunc()
}()
var maxPayloadSize int
var count atomic.Uint32
go func() {
t := time.NewTicker(3 * time.Second)
for {
<-t.C
log.Printf("entries: %d\n", count.Load())
}
}()
MainLoop:
for {
select {
case <-ctx.Done():
log.Println("closing gracefully")
reader.Close()
break MainLoop
default:
// get message
}
if reader.HasNext() {
msg, err := reader.Next(ctx)
if err != nil {
log.Println("consumer receive error:", err)
continue
}
count.Add(1)
if len(msg.Payload()) > maxPayloadSize {
maxPayloadSize = len(msg.Payload())
h := humanize.Bytes(uint64(maxPayloadSize))
log.Printf("new max payload size: %d (%s)\n",
maxPayloadSize, h)
filename :=
strings.ReplaceAll(fmt.Sprintf("%s-%s", os.Args[4], h), " ", "")
out, _ := os.Create(filename)
out.Write(msg.Payload())
if err := out.Close(); err != nil {
log.Fatal(err)
}
}
} else {
log.Println("done!")
break MainLoop
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]