merlimat closed pull request #2527: [go] Ensure producer/consumer/reader keep a 
ref of client instance so it won't be finalized
URL: https://github.com/apache/incubator-pulsar/pull/2527
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71a49..c78a58eb87 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+       client         *client
        ptr            *C.pulsar_consumer_t
        defaultChannel chan ConsumerMessage
 }
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, 
callback func(Consu
 
        conf := C.pulsar_consumer_configuration_create()
 
-       consumer := &consumer{}
+       consumer := &consumer{client: client}
 
        if options.MessageChannel == nil {
                // If there is no message listener, set a default channel so 
that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go 
b/pulsar-client-go/pulsar/c_producer.go
index 284315dbb2..620b64d8b0 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
 */
 import "C"
 import (
+       "context"
        "runtime"
-       "unsafe"
        "time"
-       "context"
+       "unsafe"
 )
 
 type createProducerCtx struct {
+       client   *client
        callback func(producer Producer, err error)
        conf     *C.pulsar_producer_configuration_t
 }
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, 
ptr *C.pulsar_produc
        if res != C.pulsar_result_Ok {
                producerCtx.callback(nil, newError(res, "Failed to create 
Producer"))
        } else {
-               p := &producer{ptr: ptr}
+               p := &producer{client: producerCtx.client, ptr: ptr}
                runtime.SetFinalizer(p, producerFinalizer)
                producerCtx.callback(p, nil)
        }
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options 
ProducerOptions, callback func(
        defer C.free(unsafe.Pointer(topicName))
 
        C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-               savePointer(createProducerCtx{callback, conf}))
+               savePointer(createProducerCtx{client,callback, conf}))
 }
 
 type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, 
metadata *C.pulsar_topic
 /// Producer
 
 type producer struct {
-       ptr *C.pulsar_producer_t
+       client *client
+       ptr    *C.pulsar_producer_t
 }
 
 func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf840..7336c1a39b 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+       client         *client
        ptr            *C.pulsar_reader_t
        defaultChannel chan ReaderMessage
 }
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, 
callback func(Read
                return
        }
 
-       reader := &reader{}
+       reader := &reader{client: client}
 
        if options.MessageChannel == nil {
                // If there is no message listener, set a default channel so 
that we can have receive to


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to