This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new baaf68d Fix producer panic by oldProducers (#598)
baaf68d is described below
commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f
Author: xiaolong ran <[email protected]>
AuthorDate: Wed Aug 25 14:54:31 2021 +0800
Fix producer panic by oldProducers (#598)
* Fix producer panic by oldProducers
Signed-off-by: xiaolongran <[email protected]>
* fix comments
Signed-off-by: xiaolongran <[email protected]>
* fix comments
Signed-off-by: xiaolongran <[email protected]>
* fix ci error
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/consumer_impl.go | 16 +++++++++++++---
pulsar/producer_impl.go | 17 ++++++++++++++---
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b7bc607..ec7ad7d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/pkg/errors"
)
const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
return nil
}
+ if oldNumPartitions > newNumPartitions {
+ c.log.WithField("old_partitions", oldNumPartitions).
+ WithField("new_partitions", newNumPartitions).
+ Error("Does not support scaling down operations
on topic partitions")
+ return errors.New("Does not support scaling down
operations on topic partitions")
+ }
+
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
c.consumers = make([]*partitionConsumer, newNumPartitions)
- // Copy over the existing consumer instances
- for i := 0; i < oldNumPartitions; i++ {
- c.consumers[i] = oldConsumers[i]
+ if oldConsumers != nil {
+ // Copy over the existing consumer instances
+ for i := 0; i < oldNumPartitions; i++ {
+ c.consumers[i] = oldConsumers[i]
+ }
}
type ConsumerError struct {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 1ffd24c..adf9b14 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/pkg/errors"
)
const (
@@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers()
error {
return nil
}
+ if oldNumPartitions > newNumPartitions {
+ p.log.WithField("old_partitions", oldNumPartitions).
+ WithField("new_partitions", newNumPartitions).
+ Error("Does not support scaling down operations
on topic partitions")
+ return errors.New("Does not support scaling down
operations on topic partitions")
+ }
+
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
+
}
p.producers = make([]Producer, newNumPartitions)
- // Copy over the existing consumer instances
- for i := 0; i < oldNumPartitions; i++ {
- p.producers[i] = oldProducers[i]
+ if oldProducers != nil {
+ // Copy over the existing consumer instances
+ for i := 0; i < oldNumPartitions; i++ {
+ p.producers[i] = oldProducers[i]
+ }
}
type ProducerError struct {