This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b58f115 [PIP 90] go client retrieve broker metadata (#745)
b58f115 is described below
commit b58f1157e4aadfc662c3d5fc19f2de55cc9ff411
Author: ZhangJian He <[email protected]>
AuthorDate: Tue Mar 22 12:15:14 2022 +0800
[PIP 90] go client retrieve broker metadata (#745)
[PIP 90] go client retrieve broker metadata
---
pulsar/consumer_partition.go | 24 ++++++++++++++++++++--
pulsar/impl_message.go | 10 +++++++++
pulsar/internal/buffer.go | 6 ++++++
pulsar/internal/commands.go | 20 ++++++++++++++++--
pulsar/internal/commands_test.go | 18 ++++++++++++++++
pulsar/internal/connection.go | 5 +++--
.../pulsartracing/message_carrier_util_test.go | 8 ++++++++
pulsar/message.go | 8 ++++++++
pulsar/negative_acks_tracker_test.go | 16 +++++++++++++++
9 files changed, 109 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a39c5..b06474d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
pbMsgID := response.GetMessageId()
reader := internal.NewMessageReader(headersAndPayload)
+ brokerMetadata, err := reader.ReadBrokerMetadata()
+ if err != nil {
+ // todo optimize use more appropriate error codes
+ pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_BatchDeSerializeError)
+ return err
+ }
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_ChecksumMismatch)
return err
}
-
decryptedPayload, err :=
pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
@@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
pc.AckID(msgID)
continue
}
-
+ var messageIndex *uint64
+ var brokerPublishTime *time.Time
+ if brokerMetadata != nil {
+ if brokerMetadata.Index != nil {
+ aux := brokerMetadata.GetIndex() -
uint64(numMsgs) + uint64(i) + 1
+ messageIndex = &aux
+ }
+ if brokerMetadata.BrokerTimestamp != nil {
+ aux :=
timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
+ brokerPublishTime = &aux
+ }
+ }
// set the consumer so we know how to ack the message id
msgID.consumer = pc
var msg *message
@@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
replicatedFrom:
msgMeta.GetReplicatedFrom(),
redeliveryCount:
response.GetRedeliveryCount(),
orderingKey: string(smm.OrderingKey),
+ index: messageIndex,
+ brokerPublishTime: brokerPublishTime,
}
} else {
msg = &message{
@@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom:
msgMeta.GetReplicatedFrom(),
redeliveryCount:
response.GetRedeliveryCount(),
+ index: messageIndex,
+ brokerPublishTime: brokerPublishTime,
}
}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9809aa..3216676 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -241,6 +241,8 @@ type message struct {
redeliveryCount uint32
schema Schema
encryptionContext *EncryptionContext
+ index *uint64
+ brokerPublishTime *time.Time
}
func (msg *message) Topic() string {
@@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext()
*EncryptionContext {
return msg.encryptionContext
}
+func (msg *message) Index() *uint64 {
+ return msg.index
+}
+
+func (msg *message) BrokerPublishTime() *time.Time {
+ return msg.brokerPublishTime
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index f3b8fe6..b3e23fb 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -38,6 +38,8 @@ type Buffer interface {
Read(size uint32) []byte
+ Skip(size uint32)
+
Get(readerIndex uint32, size uint32) []byte
ReadableSlice() []byte
@@ -122,6 +124,10 @@ func (b *buffer) Read(size uint32) []byte {
return res
}
+func (b *buffer) Skip(size uint32) {
+ b.readerIdx += size
+}
+
func (b *buffer) Get(readerIdx uint32, size uint32) []byte {
return b.data[readerIdx : readerIdx+size]
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index b91c0b6..7fd1885 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -18,6 +18,7 @@
package internal
import (
+ "encoding/binary"
"errors"
"fmt"
@@ -34,8 +35,9 @@ const (
// MessageFramePadding is for metadata and other frame headers
MessageFramePadding = 10 * 1024
// MaxFrameSize limit the maximum size that pulsar allows for messages
to be sent.
- MaxFrameSize = MaxMessageSize + MessageFramePadding
- magicCrc32c uint16 = 0x0e01
+ MaxFrameSize = MaxMessageSize + MessageFramePadding
+ magicCrc32c uint16 = 0x0e01
+ magicBrokerEntryMetadata uint16 = 0x0e02
)
// ErrCorruptedMessage is the error returned by ReadMessageData when it has
detected corrupted data.
@@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata()
(*pb.MessageMetadata, error) {
return &meta, nil
}
+func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
+ magicNumber :=
binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2))
+ if magicNumber != magicBrokerEntryMetadata {
+ return nil, nil
+ }
+ r.buffer.Skip(2)
+ size := r.buffer.ReadUint32()
+ var brokerEntryMetadata pb.BrokerEntryMetadata
+ if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata);
err != nil {
+ return nil, err
+ }
+ return &brokerEntryMetadata, nil
+}
+
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte,
error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
return nil, nil, ErrEOM
diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go
index b43335a..c236e10 100644
--- a/pulsar/internal/commands_test.go
+++ b/pulsar/internal/commands_test.go
@@ -70,6 +70,19 @@ func TestReadMessageMetadata(t *testing.T) {
assert.Equal(t, 10, int(meta.GetNumMessagesInBatch()))
}
+func TestReadBrokerEntryMetadata(t *testing.T) {
+ // read old style message (not batched)
+ reader := NewMessageReaderFromArray(brokerEntryMeta)
+ meta, err := reader.ReadBrokerMetadata()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var expectedBrokerTimestamp uint64 = 1646983036054
+ assert.Equal(t, expectedBrokerTimestamp, *meta.BrokerTimestamp)
+ var expectedIndex uint64 = 5
+ assert.Equal(t, expectedIndex, *meta.Index)
+}
+
func TestReadMessageOldFormat(t *testing.T) {
reader := NewMessageReaderFromArray(rawCompatSingleMessage)
_, err := reader.ReadMessageMetadata()
@@ -210,3 +223,8 @@ var rawBatchMessage10 = []byte{
0x28, 0x05, 0x40, 0x09, 0x68, 0x65, 0x6c, 0x6c,
0x6f,
}
+
+var brokerEntryMeta = []byte{
+ 0x0e, 0x02, 0x00, 0x00, 0x00, 0x09, 0x08, 0x96,
+ 0xf9, 0xda, 0xbe, 0xf7, 0x2f, 0x10, 0x05,
+}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1e2009c..a025abf 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -43,7 +43,7 @@ const (
PulsarVersion = "0.1"
ClientVersionString = "Pulsar Go " + PulsarVersion
- PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
+ PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
)
type TLSOptions struct {
@@ -292,7 +292,8 @@ func (c *connection) doHandshake() bool {
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
FeatureFlags: &pb.FeatureFlags{
- SupportsAuthRefresh: proto.Bool(true),
+ SupportsAuthRefresh: proto.Bool(true),
+ SupportsBrokerEntryMetadata: proto.Bool(true),
},
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 677a7ff..7f25578 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -123,3 +123,11 @@ func (msg *mockConsumerMessage) ProducerName() string {
func (msg *mockConsumerMessage) GetEncryptionContext()
*pulsar.EncryptionContext {
return &pulsar.EncryptionContext{}
}
+
+func (msg *mockConsumerMessage) Index() *uint64 {
+ return nil
+}
+
+func (msg *mockConsumerMessage) BrokerPublishTime() *time.Time {
+ return nil
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 3779caf..b88f158 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -121,6 +121,14 @@ type Message interface {
// GetEncryptionContext returns the ecryption context of the message.
// It will be used by the application to parse the undecrypted message.
GetEncryptionContext() *EncryptionContext
+
+ // Index returns index from broker entry metadata,
+ // or empty if the feature is not enabled in the broker.
+ Index() *uint64
+
+ // BrokerPublishTime returns broker publish time from broker entry
metadata,
+ // or empty if the feature is not enabled in the broker.
+ BrokerPublishTime() *time.Time
}
// MessageID identifier for a particular message
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index 51965ea..e47fb09 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -237,6 +237,14 @@ func (msg *mockMessage1) GetEncryptionContext()
*EncryptionContext {
return &EncryptionContext{}
}
+func (msg *mockMessage1) Index() *uint64 {
+ return nil
+}
+
+func (msg *mockMessage1) BrokerPublishTime() *time.Time {
+ return nil
+}
+
type mockMessage2 struct {
properties map[string]string
}
@@ -300,3 +308,11 @@ func (msg *mockMessage2) ProducerName() string {
func (msg *mockMessage2) GetEncryptionContext() *EncryptionContext {
return &EncryptionContext{}
}
+
+func (msg *mockMessage2) Index() *uint64 {
+ return nil
+}
+
+func (msg *mockMessage2) BrokerPublishTime() *time.Time {
+ return nil
+}