This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf2f9b8 Fix issue where DisableReplication flag does not work (#1100)
0bf2f9b8 is described below

commit 0bf2f9b86f4e540e912f73c65327b9dc6c4b57c9
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Oct 10 11:49:26 2023 +0900

    Fix issue where DisableReplication flag does not work (#1100)
---
 pulsar/producer_partition.go |  4 +-
 pulsar/producer_test.go      | 92 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+), 2 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 720c7df4..ebd292ec 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -551,13 +551,13 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                deliverAt = time.Now().Add(msg.DeliverAfter)
        }
 
-       mm := p.genMetadata(msg, uncompressedSize, deliverAt)
-
        // set default ReplicationClusters when DisableReplication
        if msg.DisableReplication {
                msg.ReplicationClusters = []string{"__local__"}
        }
 
+       mm := p.genMetadata(msg, uncompressedSize, deliverAt)
+
        sendAsBatch := !p.options.DisableBatching &&
                msg.ReplicationClusters == nil &&
                deliverAt.UnixNano() < 0
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 38fec576..29ffa780 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,7 +30,9 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/proto"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
        plog "github.com/apache/pulsar-client-go/pulsar/log"
@@ -2298,3 +2300,93 @@ func TestFailPendingMessageWithClose(t *testing.T) {
        testProducer.Close()
        assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
 }
+
+type pendingQueueWrapper struct {
+       pendingQueue   internal.BlockingQueue
+       writtenBuffers *[]internal.Buffer
+}
+
+func (pqw *pendingQueueWrapper) Put(item interface{}) {
+       pi := item.(*pendingItem)
+       writerIdx := pi.buffer.WriterIndex()
+       buf := internal.NewBuffer(int(writerIdx))
+       buf.Write(pi.buffer.Get(0, writerIdx))
+       *pqw.writtenBuffers = append(*pqw.writtenBuffers, buf)
+       pqw.pendingQueue.Put(item)
+}
+
+func (pqw *pendingQueueWrapper) Take() interface{} {
+       return pqw.pendingQueue.Take()
+}
+
+func (pqw *pendingQueueWrapper) Poll() interface{} {
+       return pqw.pendingQueue.Poll()
+}
+
+func (pqw *pendingQueueWrapper) CompareAndPoll(compare func(interface{}) bool) 
interface{} {
+       return pqw.pendingQueue.CompareAndPoll(compare)
+}
+
+func (pqw *pendingQueueWrapper) Peek() interface{} {
+       return pqw.pendingQueue.Peek()
+}
+
+func (pqw *pendingQueueWrapper) PeekLast() interface{} {
+       return pqw.pendingQueue.PeekLast()
+}
+
+func (pqw *pendingQueueWrapper) Size() int {
+       return pqw.pendingQueue.Size()
+}
+
+func (pqw *pendingQueueWrapper) ReadableSlice() []interface{} {
+       return pqw.pendingQueue.ReadableSlice()
+}
+
+func TestDisableReplication(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       testProducer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, testProducer)
+       defer testProducer.Close()
+
+       writtenBuffers := make([]internal.Buffer, 0)
+       pqw := &pendingQueueWrapper{
+               pendingQueue:   internal.NewBlockingQueue(1000),
+               writtenBuffers: &writtenBuffers,
+       }
+
+       partitionProducerImp := 
testProducer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp.pendingQueue = pqw
+
+       ID, err := testProducer.Send(context.Background(), &ProducerMessage{
+               Payload:            []byte("disable-replication"),
+               DisableReplication: true,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, ID)
+
+       assert.Equal(t, 1, len(writtenBuffers))
+       buf := writtenBuffers[0]
+
+       buf.Skip(4)                        // TOTAL_SIZE
+       cmdSize := buf.ReadUint32()        // CMD_SIZE
+       buf.Skip(cmdSize)                  // CMD
+       buf.Skip(2)                        // MAGIC_NUMBER
+       buf.Skip(4)                        // CHECKSUM
+       metadataSize := buf.ReadUint32()   // METADATA_SIZE
+       metadata := buf.Read(metadataSize) // METADATA
+
+       var msgMetadata pb.MessageMetadata
+       err = proto.Unmarshal(metadata, &msgMetadata)
+       assert.NoError(t, err)
+       assert.Equal(t, []string{"__local__"}, msgMetadata.GetReplicateTo())
+}

Reply via email to