This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a28e21c [Producer] respect context cancellation in Flush (#1165)
2a28e21c is described below

commit 2a28e21c59d005515e118fed5bf8f333d6699e39
Author: Jayant <[email protected]>
AuthorDate: Thu Feb 1 21:52:52 2024 -0500

    [Producer] respect context cancellation in Flush (#1165)
    
    ### Motivation
    
    The producer's `Flush` method does not respect context cancellation. If the 
caller's context get's cancelled, it will have to wait for the producer to 
finish flushing.
    
    ### Modifications
    
    This change adds a `FlushWithCtx` method which takes a context and selects 
on two channels.
---
 pulsar/consumer_test.go                            |  6 +++---
 .../pulsartracing/producer_interceptor_test.go     |  4 ++++
 pulsar/producer.go                                 |  7 +++++--
 pulsar/producer_impl.go                            |  6 +++++-
 pulsar/producer_partition.go                       | 18 +++++++++++++---
 pulsar/producer_test.go                            | 24 +++++++++++-----------
 pulsar/reader_test.go                              |  6 +++---
 7 files changed, 47 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index df70b0dd..d66e2376 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
        }
        wg.Wait()
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.NoError(t, err)
 
        // send another batch
@@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
                }, nil)
        }
 
-       producer.Flush()
+       producer.FlushWithCtx(context.Background())
 
        for i := 0; i < N; i++ {
                msg, err := consumer.Receive(ctx)
@@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool, o
                        log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), 
id.BatchSize())
                })
        }
-       assert.Nil(t, producer.Flush())
+       assert.Nil(t, producer.FlushWithCtx(context.Background()))
 
        msgIds := make([]MessageID, BatchingMaxSize)
        for i := 0; i < BatchingMaxSize; i++ {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go 
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index 8d8e6965..1c2c712f 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error {
        return nil
 }
 
+func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
+       return nil
+}
+
 func (p *mockProducer) Close() {}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 70d152c7..f8013a16 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -237,10 +237,13 @@ type Producer interface {
        // return the last sequence id published by this producer.
        LastSequenceID() int64
 
-       // Flush all the messages buffered in the client and wait until all 
messages have been successfully
-       // persisted.
+       // Deprecated: Use `FlushWithCtx()` instead.
        Flush() error
 
+       // Flush all the messages buffered in the client and wait until all 
messageshave been successfully
+       // persisted.
+       FlushWithCtx(ctx context.Context) error
+
        // Close the producer and releases resources allocated
        // No more writes will be accepted from this producer. Waits until all 
pending write request are persisted. In case
        // of errors, pending writes will not be retried.
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 3c45b597..ca923108 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 {
 }
 
 func (p *producer) Flush() error {
+       return p.FlushWithCtx(context.Background())
+}
+
+func (p *producer) FlushWithCtx(ctx context.Context) error {
        p.RLock()
        defer p.RUnlock()
 
        for _, pp := range p.producers {
-               if err := pp.Flush(); err != nil {
+               if err := pp.FlushWithCtx(ctx); err != nil {
                        return err
                }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1b79053e..fbcc5b97 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 {
 }
 
 func (p *partitionProducer) Flush() error {
+       return p.FlushWithCtx(context.Background())
+}
+
+func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
        flushReq := &flushRequest{
                doneCh: make(chan struct{}),
                err:    nil,
        }
-       p.cmdChan <- flushReq
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       case p.cmdChan <- flushReq:
+       }
 
        // wait for the flush request to complete
-       <-flushReq.doneCh
-       return flushReq.err
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       case <-flushReq.doneCh:
+               return flushReq.err
+       }
 }
 
 func (p *partitionProducer) getProducerState() producerState {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3b9ea7e8..ba591156 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) {
                assert.NoError(t, err)
        }
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.Nil(t, err)
 
        wg.Wait()
@@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
                assert.NoError(t, err)
        }
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.Nil(t, err)
 
        wg.Wait()
@@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) {
                })
                assert.Nil(t, err)
        }
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.Nil(t, err)
        wg.Wait()
 
@@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) {
                assert.Nil(t, err)
        }
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.Nil(t, err)
        wg.Wait()
 
@@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
        }
 
        // After flush, should be able to consume.
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.Nil(t, err)
 
        wg.Wait()
@@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t 
*testing.T) {
                }
 
        }
-       producer.Flush()
+       producer.FlushWithCtx(context.Background())
 
        //// create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
@@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
                        assert.NotNil(t, id)
                })
        }
-       producer.Flush()
+       producer.FlushWithCtx(context.Background())
 
        //// create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
@@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
        assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
 
        // flush pending msg
-       err = producer1.Flush()
+       err = producer1.FlushWithCtx(context.Background())
        assert.NoError(t, err)
-       err = producer2.Flush()
+       err = producer2.FlushWithCtx(context.Background())
        assert.NoError(t, err)
        assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
 
@@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t 
*testing.T) {
        assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))
 
        // flush pending msg
-       err = producer1.Flush()
+       err = producer1.FlushWithCtx(context.Background())
        assert.NoError(t, err)
-       err = producer2.Flush()
+       err = producer2.FlushWithCtx(context.Background())
        assert.NoError(t, err)
        assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
 
@@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) {
        cancel()
        wg.Wait()
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.NoError(t, err)
        assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())
 
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index ec10f8f1..c8228a7c 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
                })
        }
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.NoError(t, err)
 
        // create reader on 5th message (not included)
@@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
                })
        }
 
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.NoError(t, err)
 
        // create reader on 5th message (not included)
@@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) {
                        seekID = id
                }
        }
-       err = producer.Flush()
+       err = producer.FlushWithCtx(context.Background())
        assert.NoError(t, err)
 
        for i := 0; i < N; i++ {

Reply via email to