This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee6330  [Issue 763][producer] Fix deadlock in Producer Send when 
message fails to encode. (#762)
5ee6330 is described below

commit 5ee63303d43e8ee05f2c957a32c5526f29f54a36
Author: samuelhewitt <[email protected]>
AuthorDate: Fri Apr 29 09:33:21 2022 -0400

    [Issue 763][producer] Fix deadlock in Producer Send when message fails to 
encode. (#762)
    
    * Release semaphore and execute callback when message fails to encode.
    Add tests for producer schema encode.
    
    * Use well-defined error code.
    
    Co-authored-by: shewitt <[email protected]>
---
 pulsar/error.go              |  4 ++++
 pulsar/producer_partition.go |  2 ++
 pulsar/producer_test.go      | 44 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 50 insertions(+)

diff --git a/pulsar/error.go b/pulsar/error.go
index f433bfc..ead5cf9 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -101,6 +101,8 @@ const (
        SeekFailed
        // ProducerClosed means producer already been closed
        ProducerClosed
+       // SchemaFailure means the payload could not be encoded using the Schema
+       SchemaFailure
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -205,6 +207,8 @@ func getResultStr(r Result) string {
                return "SeekFailed"
        case ProducerClosed:
                return "ProducerClosed"
+       case SchemaFailure:
+               return "SchemaFailure"
        default:
                return fmt.Sprintf("Result(%d)", r)
        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index bc775e9..43ae68f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -419,6 +419,8 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                var schemaPayload []byte
                schemaPayload, err = p.options.Schema.Encode(msg.Value)
                if err != nil {
+                       p.publishSemaphore.Release()
+                       request.callback(nil, request.msg, 
newError(SchemaFailure, err.Error()))
                        p.log.WithError(err).Errorf("Schema encode message 
failed %s", msg.Value)
                        return
                }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3c526bb..541c1fe 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -924,6 +924,50 @@ func TestMaxMessageSize(t *testing.T) {
        }
 }
 
+func TestFailedSchemaEncode(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: NewAvroSchema("{\"type\":\"string\"}", nil),
+       })
+
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               // producer should send return an error as message is Int64, 
but schema is String
+               mid, err := producer.Send(ctx, &ProducerMessage{
+                       Value: int64(1),
+               })
+               assert.NotNil(t, err)
+               assert.Nil(t, mid)
+               wg.Done()
+       }()
+
+       wg.Add(1)
+       // producer should send return an error as message is Int64, but schema 
is String
+       producer.SendAsync(ctx, &ProducerMessage{
+               Value: int64(1),
+       }, func(messageID MessageID, producerMessage *ProducerMessage, err 
error) {
+               assert.NotNil(t, err)
+               assert.Nil(t, messageID)
+               wg.Done()
+       })
+       wg.Wait()
+}
+
 func TestSendTimeout(t *testing.T) {
        quotaURL := adminURL + 
"/admin/v2/namespaces/public/default/backlogQuota"
        quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`

Reply via email to