This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new baaf68d  Fix producer panic by oldProducers (#598)
baaf68d is described below

commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f
Author: xiaolong ran <[email protected]>
AuthorDate: Wed Aug 25 14:54:31 2021 +0800

    Fix producer panic by oldProducers (#598)
    
    * Fix producer panic by oldProducers
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix comments
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix comments
    
    Signed-off-by: xiaolongran <[email protected]>
    
    * fix ci error
    
    Signed-off-by: xiaolongran <[email protected]>
---
 pulsar/consumer_impl.go | 16 +++++++++++++---
 pulsar/producer_impl.go | 17 ++++++++++++++---
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b7bc607..ec7ad7d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                        return nil
                }
 
+               if oldNumPartitions > newNumPartitions {
+                       c.log.WithField("old_partitions", oldNumPartitions).
+                               WithField("new_partitions", newNumPartitions).
+                               Error("Does not support scaling down operations 
on topic partitions")
+                       return errors.New("Does not support scaling down 
operations on topic partitions")
+               }
+
                c.log.WithField("old_partitions", oldNumPartitions).
                        WithField("new_partitions", newNumPartitions).
                        Info("Changed number of partitions in topic")
@@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
        c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-       // Copy over the existing consumer instances
-       for i := 0; i < oldNumPartitions; i++ {
-               c.consumers[i] = oldConsumers[i]
+       if oldConsumers != nil {
+               // Copy over the existing consumer instances
+               for i := 0; i < oldNumPartitions; i++ {
+                       c.consumers[i] = oldConsumers[i]
+               }
        }
 
        type ConsumerError struct {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 1ffd24c..adf9b14 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/pkg/errors"
 )
 
 const (
@@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                        return nil
                }
 
+               if oldNumPartitions > newNumPartitions {
+                       p.log.WithField("old_partitions", oldNumPartitions).
+                               WithField("new_partitions", newNumPartitions).
+                               Error("Does not support scaling down operations 
on topic partitions")
+                       return errors.New("Does not support scaling down 
operations on topic partitions")
+               }
+
                p.log.WithField("old_partitions", oldNumPartitions).
                        WithField("new_partitions", newNumPartitions).
                        Info("Changed number of partitions in topic")
+
        }
 
        p.producers = make([]Producer, newNumPartitions)
 
-       // Copy over the existing consumer instances
-       for i := 0; i < oldNumPartitions; i++ {
-               p.producers[i] = oldProducers[i]
+       if oldProducers != nil {
+               // Copy over the existing consumer instances
+               for i := 0; i < oldNumPartitions; i++ {
+                       p.producers[i] = oldProducers[i]
+               }
        }
 
        type ProducerError struct {

Reply via email to