This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 49b7a3c [Issue:30] Support seek logic and add test case (#56) 49b7a3c is described below commit 49b7a3c43807aae16ef117a2370484dfe46d965e Author: 冉小龙 <ranxiaolong...@gmail.com> AuthorDate: Tue Aug 20 16:06:09 2019 +0800 [Issue:30] Support seek logic and add test case (#56) * [Issue:30] Support seek logic and add test case Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com> * fix a little Signed-off-by: xiaolong.ran <ranxiaolong...@gmail.com> --- pulsar/consumer_test.go | 68 +++++++++++++++++++++++++++++++++++++++ pulsar/impl_consumer.go | 8 ++--- pulsar/impl_partition_consumer.go | 17 ++++------ pulsar/impl_partition_producer.go | 2 +- pulsar/internal/connection.go | 33 +++++++++++++++++-- pulsar/producer.go | 2 +- 6 files changed, 111 insertions(+), 19 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 97b4264..f5e9ba3 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -722,3 +722,71 @@ func TestConsumer_Shared(t *testing.T) { res := util.RemoveDuplicateElement(msgList) assert.Equal(t, 10, len(res)) } + +func TestConsumer_Seek(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "persistent://public/default/testSeek" + testURL := adminURL + "/" + "admin/v2/persistent/public/default/testSeek" + makeHTTPCall(t, http.MethodPut, testURL, "1") + subName := "sub-testSeek" + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + assert.Equal(t, producer.Topic(), topicName) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + }) + assert.Nil(t, err) + assert.Equal(t, consumer.Topic(), topicName) + assert.Equal(t, consumer.Subscription(), subName) + defer consumer.Close() + + ctx := context.Background() + + // Send 10 messages synchronously + t.Log("Publishing 10 messages synchronously") + for msgNum := 0; msgNum < 10; msgNum++ { + if err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)), + }); err != nil { + t.Fatal(err) + } + } + + t.Log("Trying to receive 10 messages") + idList := make([]MessageID, 0, 10) + for msgNum := 0; msgNum < 10; msgNum++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + idList = append(idList, msg.ID()) + fmt.Println(string(msg.Payload())) + } + + for index, id := range idList { + if index == 4 { + // seek to fourth message, expected receive fourth message. + err = consumer.Seek(id) + assert.Nil(t, err) + break + } + } + + // Sleeping for 500ms to wait for consumer re-connect + time.Sleep(500 * time.Millisecond) + + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + t.Logf("again received message:%+v", msg.ID()) + assert.Equal(t, "msg-content-4", string(msg.Payload())) +} diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go index 1f800b5..dcc1df8 100644 --- a/pulsar/impl_consumer.go +++ b/pulsar/impl_consumer.go @@ -268,10 +268,10 @@ func (c *consumer) Seek(msgID MessageID) error { partition := id.GetPartition() - // current topic is non-partition topic, we only need to get the first value in the consumers. - if partition < 0 { - partition = 0 - } + // current topic is non-partition topic, we only need to get the first value in the consumers. + if partition < 0 { + partition = 0 + } return c.consumers[partition].Seek(msgID) } diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go index 3a729cd..4e29d25 100644 --- a/pulsar/impl_partition_consumer.go +++ b/pulsar/impl_partition_consumer.go @@ -80,7 +80,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions consumerID: client.rpcClient.NewConsumerID(), partitionIdx: partitionID, partitionNum: partitionNum, - eventsChan: make(chan interface{}), + eventsChan: make(chan interface{}, 1), subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize), } @@ -167,7 +167,7 @@ func (pc *partitionConsumer) grabCnx() error { return err } - pc.log.Debug("Lookup result: ", lr) + pc.log.Infof("Lookup result: %v", lr) requestID := pc.client.rpcClient.NewRequestID() res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID, pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{ @@ -192,16 +192,13 @@ func (pc *partitionConsumer) grabCnx() error { pc.cnx = res.Cnx pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer") + pc.cnx.AddConsumeHandler(pc.consumerID, pc) msgType := res.Response.GetType() switch msgType { case pb.BaseCommand_SUCCESS: - pc.cnx.AddConsumeHandler(pc.consumerID, pc) - if err := pc.internalFlow(uint32(pc.options.ReceiverQueueSize)); err != nil { - return err - } - return nil + return pc.internalFlow(uint32(pc.options.ReceiverQueueSize)) case pb.BaseCommand_ERROR: errMsg := res.Response.GetError() return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage()) @@ -593,7 +590,6 @@ func (pc *partitionConsumer) internalClose(req *handlerClose) { pc.log.Info("Closed consumer") pc.state = consumerClosed close(pc.options.MessageChannel) - //pc.cnx.UnregisterListener(pc.consumerID) } req.waitGroup.Done() @@ -711,13 +707,13 @@ type handlerClose struct { type handleConnectionClosed struct{} func (pc *partitionConsumer) ConnectionClosed() { - // Trigger reconnection in the produce goroutine + // Trigger reconnection in the consumer goroutine pc.eventsChan <- &handleConnectionClosed{} } func (pc *partitionConsumer) reconnectToBroker() { pc.log.Info("Reconnecting to broker") - backoff := internal.Backoff{} + backoff := new(internal.Backoff) for { if pc.state != consumerReady { // Consumer is already closing @@ -727,6 +723,7 @@ func (pc *partitionConsumer) reconnectToBroker() { err := pc.grabCnx() if err == nil { // Successfully reconnected + pc.log.Info("Successfully reconnected") return } diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go index e00c1e1..7cfc45d 100644 --- a/pulsar/impl_partition_producer.go +++ b/pulsar/impl_partition_producer.go @@ -89,7 +89,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions topic: topic, options: options, producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}), + eventsChan: make(chan interface{}, 1), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: make(util.Semaphore, maxPendingMessages), pendingQueue: util.NewBlockingQueue(maxPendingMessages), diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2886a7c..7319662 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -28,10 +28,10 @@ import ( "sync/atomic" "time" - "github.com/golang/protobuf/proto" - "github.com/apache/pulsar-client-go/pkg/auth" "github.com/apache/pulsar-client-go/pkg/pb" + "github.com/apache/pulsar-client-go/util" + "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -65,6 +65,9 @@ type Connection interface { type ConsumerHandler interface { MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error + + // ConnectionClosed close the TCP connection. + ConnectionClosed() } type connectionState int @@ -336,7 +339,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by case pb.BaseCommand_ERROR: case pb.BaseCommand_CLOSE_PRODUCER: + c.handleCloseProducer(cmd.GetCloseProducer()) case pb.BaseCommand_CLOSE_CONSUMER: + c.handleCloseConsumer(cmd.GetCloseConsumer()) case pb.BaseCommand_SEND_RECEIPT: c.handleSendReceipt(cmd.GetSendReceipt()) @@ -398,7 +403,7 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { if producer, ok := c.listeners[producerID]; ok { producer.ReceivedSendReceipt(response) } else { - c.log.WithField("producerId", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId) + c.log.WithField("producerID", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId) } } @@ -438,6 +443,28 @@ func (c *connection) handlePing() { c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{})) } +func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) { + c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId()) + consumerID := closeConsumer.GetConsumerId() + if consumer, ok := c.connWrapper.Consumers[consumerID]; ok { + if !util.IsNil(consumer) { + consumer.ConnectionClosed() + } + } else { + c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer") + } +} + +func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) { + c.log.Infof("Broker notification of Closed consumer: %d", closeProducer.GetProducerId()) + producerID := closeProducer.GetProducerId() + if producer, ok := c.listeners[producerID]; ok { + producer.ConnectionClosed() + } else { + c.log.WithField("producerID", producerID).Warn("Producer with ID not found while closing producer") + } +} + func (c *connection) RegisterListener(id uint64, listener ConnectionListener) { c.Lock() defer c.Unlock() diff --git a/pulsar/producer.go b/pulsar/producer.go index ff27d3a..a199e23 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -66,7 +66,7 @@ type ProducerOptions struct { // SendTimeout set the send timeout (default: 30 seconds) // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message - // deduplication feature. + // duplication feature. SendTimeout time.Duration // MaxPendingMessages set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.