This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1dfb8fd Add producer check state before send msg. (#569)
1dfb8fd is described below
commit 1dfb8fdffb97d0c4f4845cc668bed485324daf5c
Author: Zhiqiang Li <[email protected]>
AuthorDate: Wed Jul 21 17:43:23 2021 +0800
Add producer check state before send msg. (#569)
Add producer state check before send msg.
---
pulsar/error.go | 4 ++++
pulsar/producer_partition.go | 7 +++++++
pulsar/producer_test.go | 30 ++++++++++++++++++++++++++++++
3 files changed, 41 insertions(+)
diff --git a/pulsar/error.go b/pulsar/error.go
index 60a832b..f433bfc 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -99,6 +99,8 @@ const (
AddToBatchFailed
// SeekFailed seek failed
SeekFailed
+ // ProducerClosed means producer already been closed
+ ProducerClosed
)
// Error implement error interface, composed of two parts: msg and result.
@@ -201,6 +203,8 @@ func getResultStr(r Result) string {
return "AddToBatchFailed"
case SeekFailed:
return "SeekFailed"
+ case ProducerClosed:
+ return "ProducerClosed"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8b3d33d..7e83bfa 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -50,6 +50,7 @@ var (
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue
is full")
errContextExpired = newError(TimeoutError, "message send context
expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds
MaxMessageSize")
+ errProducerClosed = newError(ProducerClosed, "producer already been
closed")
buffersPool sync.Pool
)
@@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context,
msg *ProducerMessage,
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+ if p.getProducerState() != producerReady {
+ // Producer is closing
+ callback(nil, msg, errProducerClosed)
+ return
+ }
+
sr := &sendRequest{
ctx: ctx,
msg: msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3dbd7..bbe8028 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) {
assert.Equal(t, 10, metric.sendn)
assert.Equal(t, 10, metric.ackn)
}
+
+func TestProducerSendAfterClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+
+ producer.Close()
+ ID, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.Nil(t, ID)
+ assert.Error(t, err)
+}