This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3ffcf61 [go] Ensure producer/consumer/reader keep a ref of client
instance so it won't be finalized (#2527)
3ffcf61 is described below
commit 3ffcf61c45be5d64c1f6ec538cdd38cfe0d07c2b
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Sep 6 07:40:24 2018 -0700
[go] Ensure producer/consumer/reader keep a ref of client instance so it
won't be finalized (#2527)
---
pulsar-client-go/pulsar/c_consumer.go | 3 ++-
pulsar-client-go/pulsar/c_producer.go | 12 +++++++-----
pulsar-client-go/pulsar/c_reader.go | 3 ++-
3 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-go/pulsar/c_consumer.go
b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71..c78a58e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
)
type consumer struct {
+ client *client
ptr *C.pulsar_consumer_t
defaultChannel chan ConsumerMessage
}
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions,
callback func(Consu
conf := C.pulsar_consumer_configuration_create()
- consumer := &consumer{}
+ consumer := &consumer{client: client}
if options.MessageChannel == nil {
// If there is no message listener, set a default channel so
that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go
b/pulsar-client-go/pulsar/c_producer.go
index 284315d..620b64d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
*/
import "C"
import (
+ "context"
"runtime"
- "unsafe"
"time"
- "context"
+ "unsafe"
)
type createProducerCtx struct {
+ client *client
callback func(producer Producer, err error)
conf *C.pulsar_producer_configuration_t
}
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result,
ptr *C.pulsar_produc
if res != C.pulsar_result_Ok {
producerCtx.callback(nil, newError(res, "Failed to create
Producer"))
} else {
- p := &producer{ptr: ptr}
+ p := &producer{client: producerCtx.client, ptr: ptr}
runtime.SetFinalizer(p, producerFinalizer)
producerCtx.callback(p, nil)
}
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options
ProducerOptions, callback func(
defer C.free(unsafe.Pointer(topicName))
C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
- savePointer(createProducerCtx{callback, conf}))
+ savePointer(createProducerCtx{client,callback, conf}))
}
type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t,
metadata *C.pulsar_topic
/// Producer
type producer struct {
- ptr *C.pulsar_producer_t
+ client *client
+ ptr *C.pulsar_producer_t
}
func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go
b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf..7336c1a 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
)
type reader struct {
+ client *client
ptr *C.pulsar_reader_t
defaultChannel chan ReaderMessage
}
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions,
callback func(Read
return
}
- reader := &reader{}
+ reader := &reader{client: client}
if options.MessageChannel == nil {
// If there is no message listener, set a default channel so
that we can have receive to