This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3207f  [feat] Support WaitForExclusive producer access mode. (#958)
cf3207f is described below

commit cf3207f4637d80efbcbc0f7646ea326d9f9bac6b
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Feb 20 22:32:39 2023 +0800

    [feat] Support WaitForExclusive producer access mode. (#958)
    
    * [feat] Support WaitForExclusive producer access mode.
    
    * Remove useless defer.
---
 pulsar/internal/connection.go | 17 +++++++++++++++--
 pulsar/internal/rpc_client.go | 22 ++++++++++++++++------
 pulsar/producer.go            |  3 +++
 pulsar/producer_partition.go  |  2 ++
 pulsar/producer_test.go       | 42 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index cb4af33..67b6f32 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -523,8 +523,14 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
                c.handleResponse(cmd.Success.GetRequestId(), cmd)
 
        case pb.BaseCommand_PRODUCER_SUCCESS:
-               c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
-
+               if !*cmd.ProducerSuccess.ProducerReady {
+                       request, ok := 
c.findPendingRequest(cmd.ProducerSuccess.GetRequestId())
+                       if ok {
+                               request.callback(cmd, nil)
+                       }
+               } else {
+                       c.handleResponse(cmd.ProducerSuccess.GetRequestId(), 
cmd)
+               }
        case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
                c.checkServerError(cmd.PartitionMetadataResponse.Error)
                c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), 
cmd)
@@ -748,6 +754,13 @@ func (c *connection) deletePendingRequest(requestID 
uint64) (*request, bool) {
        return request, ok
 }
 
+func (c *connection) findPendingRequest(requestID uint64) (*request, bool) {
+       c.pendingLock.Lock()
+       defer c.pendingLock.Unlock()
+       request, ok := c.pendingReqs[requestID]
+       return request, ok
+}
+
 func (c *connection) failPendingRequests(err error) bool {
        c.pendingLock.Lock()
        defer c.pendingLock.Unlock()
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 378ab4f..0ee8ca9 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -129,14 +129,24 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
                        Cnx:      cnx,
                        Response: response,
                }, err}
-               close(ch)
        })
 
-       select {
-       case res := <-ch:
-               return res.RPCResult, res.error
-       case <-time.After(c.requestTimeout):
-               return nil, ErrRequestTimeOut
+       timeoutCh := time.After(c.requestTimeout)
+       for {
+               select {
+               case res := <-ch:
+                       // Ignoring producer not ready response.
+                       // Continue to wait for the producer to create 
successfully
+                       if res.error == nil && *res.RPCResult.Response.Type == 
pb.BaseCommand_PRODUCER_SUCCESS {
+                               if 
!*res.RPCResult.Response.ProducerSuccess.ProducerReady {
+                                       timeoutCh = nil
+                                       break
+                               }
+                       }
+                       return res.RPCResult, res.error
+               case <-timeoutCh:
+                       return nil, ErrRequestTimeOut
+               }
        }
 }
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index d9ac34b..8fcb891 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -64,6 +64,9 @@ const (
        // ProducerAccessModeExclusive is required exclusive access for 
producer.
        // Fail immediately if there's already a producer connected.
        ProducerAccessModeExclusive
+
+       // ProducerAccessModeWaitForExclusive is pending until producer can 
acquire exclusive access.
+       ProducerAccessModeWaitForExclusive
 )
 
 // TopicMetadata represents a topic metadata.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index eece055..160693c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1367,6 +1367,8 @@ func toProtoProducerAccessMode(accessMode 
ProducerAccessMode) pb.ProducerAccessM
                return pb.ProducerAccessMode_Shared
        case ProducerAccessModeExclusive:
                return pb.ProducerAccessMode_Exclusive
+       case ProducerAccessModeWaitForExclusive:
+               return pb.ProducerAccessMode_WaitForExclusive
        }
 
        return pb.ProducerAccessMode_Shared
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index d7950eb..e69a14c 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1737,3 +1737,45 @@ func TestExclusiveProducer(t *testing.T) {
        assert.Error(t, err, "Producer should be failed")
        assert.True(t, strings.Contains(err.Error(), "ProducerBusy"))
 }
+
+func TestWaitForExclusiveProducer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+               // Set the request timeout is 200ms
+               OperationTimeout: 200 * time.Millisecond,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic:              topicName,
+               ProducerAccessMode: ProducerAccessModeExclusive,
+       })
+       assert.NoError(t, err)
+       assert.NotNil(t, producer1)
+
+       wg := sync.WaitGroup{}
+       wg.Add(1)
+       go func() {
+               producer2, err := client.CreateProducer(ProducerOptions{
+                       Topic:              topicName,
+                       ProducerAccessMode: ProducerAccessModeWaitForExclusive,
+               })
+               defer producer2.Close()
+               assert.NoError(t, err)
+               assert.NotNil(t, producer2)
+
+               id, err := producer2.Send(context.Background(), 
&ProducerMessage{
+                       Payload: make([]byte, 1024),
+               })
+               assert.Nil(t, err)
+               assert.NotNil(t, id)
+               wg.Done()
+       }()
+       // Because set the request timeout is 200ms before.
+       // Here waite 300ms to cover wait for exclusive producer never timeout
+       time.Sleep(300 * time.Millisecond)
+       producer1.Close()
+       wg.Wait()
+}

Reply via email to