This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2a28e21c [Producer] respect context cancellation in Flush (#1165)
2a28e21c is described below
commit 2a28e21c59d005515e118fed5bf8f333d6699e39
Author: Jayant <[email protected]>
AuthorDate: Thu Feb 1 21:52:52 2024 -0500
[Producer] respect context cancellation in Flush (#1165)
### Motivation
The producer's `Flush` method does not respect context cancellation. If the
caller's context get's cancelled, it will have to wait for the producer to
finish flushing.
### Modifications
This change adds a `FlushWithCtx` method which takes a context and selects
on two channels.
---
pulsar/consumer_test.go | 6 +++---
.../pulsartracing/producer_interceptor_test.go | 4 ++++
pulsar/producer.go | 7 +++++--
pulsar/producer_impl.go | 6 +++++-
pulsar/producer_partition.go | 18 +++++++++++++---
pulsar/producer_test.go | 24 +++++++++++-----------
pulsar/reader_test.go | 6 +++---
7 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index df70b0dd..d66e2376 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
}
wg.Wait()
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
// send another batch
@@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
}, nil)
}
- producer.Flush()
+ producer.FlushWithCtx(context.Background())
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
@@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool, o
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(),
id.BatchSize())
})
}
- assert.Nil(t, producer.Flush())
+ assert.Nil(t, producer.FlushWithCtx(context.Background()))
msgIds := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index 8d8e6965..1c2c712f 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error {
return nil
}
+func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
+ return nil
+}
+
func (p *mockProducer) Close() {}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 70d152c7..f8013a16 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -237,10 +237,13 @@ type Producer interface {
// return the last sequence id published by this producer.
LastSequenceID() int64
- // Flush all the messages buffered in the client and wait until all
messages have been successfully
- // persisted.
+ // Deprecated: Use `FlushWithCtx()` instead.
Flush() error
+ // Flush all the messages buffered in the client and wait until all
messageshave been successfully
+ // persisted.
+ FlushWithCtx(ctx context.Context) error
+
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all
pending write request are persisted. In case
// of errors, pending writes will not be retried.
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 3c45b597..ca923108 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 {
}
func (p *producer) Flush() error {
+ return p.FlushWithCtx(context.Background())
+}
+
+func (p *producer) FlushWithCtx(ctx context.Context) error {
p.RLock()
defer p.RUnlock()
for _, pp := range p.producers {
- if err := pp.Flush(); err != nil {
+ if err := pp.FlushWithCtx(ctx); err != nil {
return err
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1b79053e..fbcc5b97 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 {
}
func (p *partitionProducer) Flush() error {
+ return p.FlushWithCtx(context.Background())
+}
+
+func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
}
- p.cmdChan <- flushReq
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case p.cmdChan <- flushReq:
+ }
// wait for the flush request to complete
- <-flushReq.doneCh
- return flushReq.err
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-flushReq.doneCh:
+ return flushReq.err
+ }
}
func (p *partitionProducer) getProducerState() producerState {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3b9ea7e8..ba591156 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) {
assert.NoError(t, err)
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()
@@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
assert.NoError(t, err)
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()
@@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) {
})
assert.Nil(t, err)
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()
@@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) {
assert.Nil(t, err)
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()
@@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
}
// After flush, should be able to consume.
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()
@@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t
*testing.T) {
}
}
- producer.Flush()
+ producer.FlushWithCtx(context.Background())
//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
@@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
assert.NotNil(t, id)
})
}
- producer.Flush()
+ producer.FlushWithCtx(context.Background())
//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
@@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
// flush pending msg
- err = producer1.Flush()
+ err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
- err = producer2.Flush()
+ err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
@@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t
*testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
// flush pending msg
- err = producer1.Flush()
+ err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
- err = producer2.Flush()
+ err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
@@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) {
cancel()
wg.Wait()
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index ec10f8f1..c8228a7c 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
})
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
// create reader on 5th message (not included)
@@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
})
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
// create reader on 5th message (not included)
@@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) {
seekID = id
}
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
for i := 0; i < N; i++ {