This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d0d5d0a Add properties filed for batch (#683)
d0d5d0a is described below
commit d0d5d0ae403717505f85b86c100fac6ff4d7dcf6
Author: xiaolong ran <[email protected]>
AuthorDate: Fri Dec 10 15:12:53 2021 +0800
Add properties filed for batch (#683)
Signed-off-by: xiaolongran <[email protected]>
### Motivation
Currently, when we disable batch in Producer, in `handleSend()` of
`serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and
`msgMetadata.getNumMessagesInBatch()` is **1**.
At this point, if we get the Properties object we set on the producer side
on the broker side, the display is empty.
Go SDK set Properties:
```
// disable batch
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
DisableBatching: true,
})
// set properties for every message
producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"key-1": "value-1",
},
});
```
Broker get message properties from entry metadata is null:
```
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata msgMetadata =
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
```
And `msgMetadata.getPropertiesCount() <= 0`.
### Modifications
Add properties filed in Add single message to batchContainer
---
pulsar/internal/batch_builder.go | 2 ++
1 file changed, 2 insertions(+)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index d08af53..7e47304 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -191,6 +191,7 @@ func (bc *batchContainer) Add(
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
+ bc.msgMetadata.Properties = metadata.Properties
if deliverAt.UnixNano() > 0 {
bc.msgMetadata.DeliverAtTime =
proto.Int64(int64(TimestampMillis(deliverAt)))
@@ -211,6 +212,7 @@ func (bc *batchContainer) reset() {
bc.callbacks = []interface{}{}
bc.msgMetadata.ReplicateTo = nil
bc.msgMetadata.DeliverAtTime = nil
+ bc.msgMetadata.Properties = nil
}
// Flush all the messages buffered in the client and wait until all messages
have been successfully persisted.