This is an automated email from the ASF dual-hosted git repository.
nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf2f9b8 Fix issue where DisableReplication flag does not work (#1100)
0bf2f9b8 is described below
commit 0bf2f9b86f4e540e912f73c65327b9dc6c4b57c9
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Oct 10 11:49:26 2023 +0900
Fix issue where DisableReplication flag does not work (#1100)
---
pulsar/producer_partition.go | 4 +-
pulsar/producer_test.go | 92 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 94 insertions(+), 2 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 720c7df4..ebd292ec 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -551,13 +551,13 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
deliverAt = time.Now().Add(msg.DeliverAfter)
}
- mm := p.genMetadata(msg, uncompressedSize, deliverAt)
-
// set default ReplicationClusters when DisableReplication
if msg.DisableReplication {
msg.ReplicationClusters = []string{"__local__"}
}
+ mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
sendAsBatch := !p.options.DisableBatching &&
msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 38fec576..29ffa780 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,7 +30,9 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/crypto"
plog "github.com/apache/pulsar-client-go/pulsar/log"
@@ -2298,3 +2300,93 @@ func TestFailPendingMessageWithClose(t *testing.T) {
testProducer.Close()
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}
+
+type pendingQueueWrapper struct {
+ pendingQueue internal.BlockingQueue
+ writtenBuffers *[]internal.Buffer
+}
+
+func (pqw *pendingQueueWrapper) Put(item interface{}) {
+ pi := item.(*pendingItem)
+ writerIdx := pi.buffer.WriterIndex()
+ buf := internal.NewBuffer(int(writerIdx))
+ buf.Write(pi.buffer.Get(0, writerIdx))
+ *pqw.writtenBuffers = append(*pqw.writtenBuffers, buf)
+ pqw.pendingQueue.Put(item)
+}
+
+func (pqw *pendingQueueWrapper) Take() interface{} {
+ return pqw.pendingQueue.Take()
+}
+
+func (pqw *pendingQueueWrapper) Poll() interface{} {
+ return pqw.pendingQueue.Poll()
+}
+
+func (pqw *pendingQueueWrapper) CompareAndPoll(compare func(interface{}) bool)
interface{} {
+ return pqw.pendingQueue.CompareAndPoll(compare)
+}
+
+func (pqw *pendingQueueWrapper) Peek() interface{} {
+ return pqw.pendingQueue.Peek()
+}
+
+func (pqw *pendingQueueWrapper) PeekLast() interface{} {
+ return pqw.pendingQueue.PeekLast()
+}
+
+func (pqw *pendingQueueWrapper) Size() int {
+ return pqw.pendingQueue.Size()
+}
+
+func (pqw *pendingQueueWrapper) ReadableSlice() []interface{} {
+ return pqw.pendingQueue.ReadableSlice()
+}
+
+func TestDisableReplication(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ testProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ DisableBatching: true,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, testProducer)
+ defer testProducer.Close()
+
+ writtenBuffers := make([]internal.Buffer, 0)
+ pqw := &pendingQueueWrapper{
+ pendingQueue: internal.NewBlockingQueue(1000),
+ writtenBuffers: &writtenBuffers,
+ }
+
+ partitionProducerImp :=
testProducer.(*producer).producers[0].(*partitionProducer)
+ partitionProducerImp.pendingQueue = pqw
+
+ ID, err := testProducer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("disable-replication"),
+ DisableReplication: true,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+
+ assert.Equal(t, 1, len(writtenBuffers))
+ buf := writtenBuffers[0]
+
+ buf.Skip(4) // TOTAL_SIZE
+ cmdSize := buf.ReadUint32() // CMD_SIZE
+ buf.Skip(cmdSize) // CMD
+ buf.Skip(2) // MAGIC_NUMBER
+ buf.Skip(4) // CHECKSUM
+ metadataSize := buf.ReadUint32() // METADATA_SIZE
+ metadata := buf.Read(metadataSize) // METADATA
+
+ var msgMetadata pb.MessageMetadata
+ err = proto.Unmarshal(metadata, &msgMetadata)
+ assert.NoError(t, err)
+ assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo())
+}