This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 448387d Support partition consumer receive async and fix batch logic (#43) 448387d is described below commit 448387d738a2f3af4c8232daa4fac9576d252617 Author: 冉小龙 <ranxiaolong...@gmail.com> AuthorDate: Wed Aug 14 11:00:00 2019 +0800 Support partition consumer receive async and fix batch logic (#43) Signed-off-by: xiaolong.ran ranxiaolong...@gmail.com * Support batch logic for project * add unit test case of event time * add some unit tests case for producer * fix error result type * add unit test case of producer flush * add receiver queue size test logic * support partition consumer receive async * add unit test case of ack timeout * Fix consumer receiving message out of order --- pulsar/consumer.go | 3 + pulsar/consumer_test.go | 301 +++++++++++++++++++++++++++++++++++++- pulsar/error.go | 2 +- pulsar/impl_consumer.go | 55 +++++-- pulsar/impl_partition_consumer.go | 217 ++++++++++++++++----------- pulsar/impl_partition_producer.go | 21 ++- pulsar/internal/commands.go | 82 +++++++---- pulsar/internal/connection.go | 54 +++---- pulsar/producer_test.go | 260 +++++++++++++++++++++++++++++++- pulsar/unackedMsgTracker.go | 20 +-- util/util.go | 24 ++- util/util_test.go | 21 ++- 12 files changed, 870 insertions(+), 190 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index de190e0..c259cd6 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -142,6 +142,9 @@ type Consumer interface { // ReceiveAsync appends the message to the msgs channel asynchronously. ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error + // ReceiveAsyncWithCallback returns a callback containing the message and error objects + ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) + // Ack the consumption of a single message Ack(Message) error diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 39646d3..6fe86cd 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1,4 +1,3 @@ -// // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,7 +14,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// package pulsar @@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) { assert.Equal(t, err.Error(), "connection error") } +func TestBatchMessageReceive(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "persistent://public/default/receive-batch" + subName := "subscription-name" + prefix := "msg-batch-" + ctx := context.Background() + + // Enable batching on producer side + batchSize, numOfMessages := 2, 100 + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + BatchingMaxMessages: uint(batchSize), + DisableBatching: false, + BlockIfQueueFull: true, + }) + assert.Nil(t, err) + assert.Equal(t, topicName, producer.Topic()) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + }) + assert.Equal(t, topicName, consumer.Topic()) + count := 0 + + for i := 0; i < numOfMessages; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + msg := &ProducerMessage{ + Payload: []byte(messageContent), + } + err := producer.Send(ctx, msg) + assert.Nil(t, err) + } + + for i := 0; i < numOfMessages; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + err = consumer.Ack(msg) + assert.Nil(t, err) + count++ + } + + // check strategically + for i := 0; i < 3; i++ { + if count == numOfMessages { + break + } + time.Sleep(time.Second) + } + assert.Equal(t, count, numOfMessages) +} + func TestConsumerWithInvalidConf(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, @@ -263,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) { assert.Nil(t, err) } - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 1) go func() { for i := 0; i < 10; i++ { @@ -288,6 +347,8 @@ func TestConsumerKeyShared(t *testing.T) { } } }() + + time.Sleep(time.Second * 1) } func TestPartitionTopicsConsumerPubSub(t *testing.T) { @@ -300,7 +361,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { topic := "persistent://public/default/testGetPartitions" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions" - makeHTTPCall(t, http.MethodPut, testURL, "3") + makeHTTPCall(t, http.MethodPut, testURL, "5") // create producer producer, err := client.CreateProducer(ProducerOptions{ @@ -316,9 +377,10 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { assert.Equal(t, topic+"-partition-2", topics[2]) consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "my-sub", - Type: Exclusive, + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + ReceiverQueueSize: 10, }) assert.Nil(t, err) defer consumer.Close() @@ -348,3 +410,228 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { assert.Equal(t, len(msgs), 10) } + +func TestConsumer_ReceiveAsync(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "persistent://public/default/receive-async" + subName := "subscription-receive-async" + ctx := context.Background() + ch := make(chan ConsumerMessage, 10) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + }) + defer consumer.Close() + + //send 10 messages + for i := 0; i < 10; i++ { + err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.Nil(t, err) + } + + //receive async 10 messages + err = consumer.ReceiveAsync(ctx, ch) + assert.Nil(t, err) + + payloadList := make([]string, 0, 10) + +RECEIVE: + for { + select { + case cMsg, ok := <-ch: + if ok { + fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload())) + assert.Equal(t, topicName, cMsg.Message.Topic()) + assert.Equal(t, topicName, cMsg.Consumer.Topic()) + payloadList = append(payloadList, string(cMsg.Message.Payload())) + if len(payloadList) == 10 { + break RECEIVE + } + } + continue RECEIVE + case <-ctx.Done(): + t.Error("context error.") + return + } + } +} + +func TestConsumerAckTimeout(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := "test-ack-timeout-topic-1" + ctx := context.Background() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub1", + Type: Shared, + AckTimeout: 5 * 1000, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create consumer1 + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub2", + Type: Shared, + AckTimeout: 5 * 1000, + }) + assert.Nil(t, err) + defer consumer1.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + if err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + log.Fatal(err) + } + } + + // consumer receive 10 messages + payloadList := make([]string, 0, 10) + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + payloadList = append(payloadList, string(msg.Payload())) + + // not ack message + } + assert.Equal(t, 10, len(payloadList)) + + // consumer1 receive 10 messages + for i := 0; i < 10; i++ { + msg, err := consumer1.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + payloadList = append(payloadList, string(msg.Payload())) + + // ack half of the messages + if i%2 == 0 { + err = consumer1.Ack(msg) + assert.Nil(t, err) + } + } + + // wait ack timeout + time.Sleep(6 * time.Second) + + fmt.Println("start redeliver messages...") + + payloadList = make([]string, 0, 10) + // consumer receive messages again + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + payloadList = append(payloadList, string(msg.Payload())) + + // ack message + if err := consumer.Ack(msg); err != nil { + log.Fatal(err) + } + } + assert.Equal(t, 10, len(payloadList)) + + payloadList = make([]string, 0, 5) + // consumer1 receive messages again + go func() { + for i := 0; i < 10; i++ { + msg, err := consumer1.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + + expectMsg := fmt.Sprintf("hello-%d", i) + fmt.Printf("redeliver messages, payload is:%s\n", expectMsg) + payloadList = append(payloadList, string(msg.Payload())) + + // ack message + if err := consumer1.Ack(msg); err != nil { + log.Fatal(err) + } + } + assert.Equal(t, 5, len(payloadList)) + }() + + // sleep 2 seconds, wait gorutine receive messages. + time.Sleep(time.Second * 2) +} + +func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "persistent://public/default/receive-async-with-callback" + subName := "subscription-receive-async" + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + }) + defer consumer.Close() + + //send 10 messages + for i := 0; i < 10; i++ { + err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.Nil(t, err) + } + + for i := 0; i < 10; i++ { + consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) { + if err != nil { + log.Fatal(err) + } + fmt.Printf("receive message payload is:%s\n", string(msg.Payload())) + assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload())) + }) + } +} diff --git a/pulsar/error.go b/pulsar/error.go index 0231913..ec20844 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -24,7 +24,7 @@ type Result int const ( // ResultOk means no errors - ResultOk = iota + ResultOk Result = iota // ResultUnknownError means unknown error happened on broker ResultUnknownError // ResultInvalidConfiguration means invalid configuration diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go index 0a44971..13e72ae 100644 --- a/pulsar/impl_consumer.go +++ b/pulsar/impl_consumer.go @@ -79,6 +79,7 @@ func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) { func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) { c := &consumer{ topicName: topic, + log: log.WithField("topic", topic), queue: make(chan ConsumerMessage, options.ReceiverQueueSize), } @@ -100,7 +101,7 @@ func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string for partitionIdx, partitionTopic := range partitions { go func(partitionIdx int, partitionTopic string) { - cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx) + cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue) ch <- ConsumerError{ err: err, partition: partitionIdx, @@ -153,31 +154,63 @@ func (c *consumer) Unsubscribe() error { return nil } -func (c *consumer) Receive(ctx context.Context) (Message, error) { +func (c *consumer) getMessageFromSubConsumer(ctx context.Context) { for _, pc := range c.consumers { go func(pc Consumer) { - if err := pc.ReceiveAsync(ctx, c.queue); err != nil { + err := pc.ReceiveAsync(ctx, c.queue) + if err != nil { return } }(pc) } +} - select { - case <-ctx.Done(): - return nil, ctx.Err() - case msg, ok := <-c.queue: - if ok { - return msg.Message, nil +func (c *consumer) Receive(ctx context.Context) (message Message, err error) { + if len(c.consumers) > 1 { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case cMsg, ok := <-c.queue: + if ok { + return cMsg.Message, nil + } + return nil, errors.New("receive message error") } - return nil, errors.New("receive message error") } + + return c.consumers[0].(*partitionConsumer).Receive(ctx) } func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { - //TODO: impl logic + for _, pc := range c.consumers { + go func(pc Consumer) { + if err := pc.ReceiveAsync(ctx, msgs); err != nil { + c.log.Errorf("receive async messages error:%s, please check.", err.Error()) + return + } + }(pc) + } + return nil } +func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) { + var err error + if len(c.consumers) > 1 { + select { + case <-ctx.Done(): + c.log.Errorf("ReceiveAsyncWithCallback: receive message error:%s", ctx.Err().Error()) + return + case cMsg, ok := <-c.queue: + if ok { + callback(cMsg.Message, err) + } + return + } + } + c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, callback) +} + //Ack the consumption of a single message func (c *consumer) Ack(msg Message) error { return c.AckID(msg.ID()) diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go index 0d7069f..87cf68b 100644 --- a/pulsar/impl_partition_consumer.go +++ b/pulsar/impl_partition_consumer.go @@ -37,7 +37,7 @@ const maxRedeliverUnacknowledged = 1000 type consumerState int const ( - consumerInit = iota + consumerInit consumerState = iota consumerReady consumerClosing consumerClosed @@ -60,16 +60,17 @@ type partitionConsumer struct { consumerID uint64 subQueue chan ConsumerMessage - omu sync.Mutex // protects following - overflow []*pb.MessageIdData + omu sync.Mutex // protects following + redeliverMessages []*pb.MessageIdData unAckTracker *UnackedMessageTracker eventsChan chan interface{} partitionIdx int + partitionNum int } -func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID int) (*partitionConsumer, error) { +func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) (*partitionConsumer, error) { c := &partitionConsumer{ state: consumerInit, client: client, @@ -78,6 +79,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions log: log.WithField("topic", topic), consumerID: client.rpcClient.NewConsumerID(), partitionIdx: partitionID, + partitionNum: partitionNum, eventsChan: make(chan interface{}), subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize), } @@ -108,7 +110,7 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions if options.Type == Shared || options.Type == KeyShared { if options.AckTimeout != 0 { c.unAckTracker = NewUnackedMessageTracker() - c.unAckTracker.pc = c + c.unAckTracker.pcs = append(c.unAckTracker.pcs, c) c.unAckTracker.Start(int64(options.AckTimeout)) } } @@ -128,6 +130,18 @@ func newPartitionConsumer(client *client, topic string, options *ConsumerOptions c.log = c.log.WithField("name", c.consumerName) c.log.Info("Created consumer") c.state = consumerReady + + // In here, open a gorutine to receive data asynchronously from the subConsumer, + // filling the queue channel of the current consumer. + if partitionNum > 1 { + go func() { + err = c.ReceiveAsync(context.Background(), ch) + if err != nil { + return + } + }() + } + go c.runEventsLoop() return c, nil @@ -238,35 +252,60 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) { unsub.waitGroup.Done() } -func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case cm, ok := <-pc.subQueue: - if ok { - id := &pb.MessageIdData{} - err := proto.Unmarshal(cm.ID().Serialize(), id) - if err != nil { - pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error()) - return nil, err - } - if pc.unAckTracker != nil { - pc.unAckTracker.Add(id) - } - return cm.Message, nil - } - return nil, newError(ResultConnectError, "receive queue closed") +func (pc *partitionConsumer) trackMessage(msgID MessageID) error { + id := &pb.MessageIdData{} + err := proto.Unmarshal(msgID.Serialize(), id) + if err != nil { + pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error()) + return err + } + if pc.unAckTracker != nil { + pc.unAckTracker.Add(id) } + return nil } -func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { +func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow uint32) error { highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 1)) + if receivedSinceFlow >= highwater { + if err := pc.internalFlow(receivedSinceFlow); err != nil { + pc.log.Errorf("Send Flow cmd error:%s", err.Error()) + return err + } + receivedSinceFlow = 0 + } + return nil +} + +func (pc *partitionConsumer) messageProcessed(msgID MessageID, receivedSinceFlow uint32) error { + err := pc.trackMessage(msgID) + if err != nil { + return err + } + receivedSinceFlow++ - // request half the buffer's capacity - if err := pc.internalFlow(highwater); err != nil { - pc.log.Errorf("Send Flow cmd error:%s", err.Error()) + err = pc.increaseAvailablePermits(receivedSinceFlow) + if err != nil { return err } + + return nil +} + +func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) { + wg := &sync.WaitGroup{} + wg.Add(1) + pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) { + message = msg + err = e + wg.Done() + }) + wg.Wait() + + return message, err +} + +func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error { var receivedSinceFlow uint32 for { @@ -274,30 +313,38 @@ func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Consu case tmpMsg, ok := <-pc.subQueue: if ok { msgs <- tmpMsg - id := &pb.MessageIdData{} - err := proto.Unmarshal(tmpMsg.ID().Serialize(), id) + + err := pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow) if err != nil { - pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error()) return err } - if pc.unAckTracker != nil { - pc.unAckTracker.Add(id) - } - receivedSinceFlow++ - if receivedSinceFlow >= highwater { - if err := pc.internalFlow(receivedSinceFlow); err != nil { - pc.log.Errorf("Send Flow cmd error:%s", err.Error()) - return err - } - receivedSinceFlow = 0 - } continue } + break case <-ctx.Done(): return ctx.Err() } } +} +func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) { + var receivedSinceFlow uint32 + var err error + + select { + case tmpMsg, ok := <-pc.subQueue: + if ok { + err = pc.messageProcessed(tmpMsg.ID(), receivedSinceFlow) + callback(tmpMsg.Message, err) + if err != nil { + pc.log.Errorf("processed messages error:%s", err.Error()) + return + } + } + case <-ctx.Done(): + pc.log.Errorf("context shouldn't done, please check error:%s", ctx.Err().Error()) + return + } } func (pc *partitionConsumer) Ack(msg Message) error { @@ -465,23 +512,23 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) { pc.omu.Lock() defer pc.omu.Unlock() - overFlowSize := len(pc.overflow) + redeliverMessagesSize := len(pc.redeliverMessages) - if overFlowSize == 0 { + if redeliverMessagesSize == 0 { return } requestID := pc.client.rpcClient.NewRequestID() - for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged { + for i := 0; i < len(pc.redeliverMessages); i += maxRedeliverUnacknowledged { end := i + maxRedeliverUnacknowledged - if end > overFlowSize { - end = overFlowSize + if end > redeliverMessagesSize { + end = redeliverMessagesSize } _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{ ConsumerId: proto.Uint64(pc.consumerID), - MessageIds: pc.overflow[i:end], + MessageIds: pc.redeliverMessages[i:end], }) if err != nil { pc.log.WithError(err).Error("Failed to unsubscribe consumer") @@ -489,8 +536,8 @@ func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) { } } - // clear Overflow slice - pc.overflow = nil + // clear redeliverMessages slice + pc.redeliverMessages = nil if pc.unAckTracker != nil { pc.unAckTracker.clear() @@ -574,56 +621,58 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error { return nil } -func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error { +func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error { msgID := response.GetMessageId() id := newMessageID(int64(msgID.GetLedgerId()), int64(msgID.GetEntryId()), int(msgID.GetBatchIndex()), pc.partitionIdx) - msgMeta, payload, err := internal.ParseMessage(headersAndPayload) + msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload) if err != nil { return fmt.Errorf("parse message error:%s", err) } - //numMsgs := msgMeta.GetNumMessagesInBatch() - - msg := &message{ - publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), - eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()), - key: msgMeta.GetPartitionKey(), - properties: internal.ConvertToStringMap(msgMeta.GetProperties()), - topic: pc.topic, - msgID: id, - payLoad: payload, - } + for _, payload := range payloadList { + msg := &message{ + publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), + eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()), + key: msgMeta.GetPartitionKey(), + properties: internal.ConvertToStringMap(msgMeta.GetProperties()), + topic: pc.topic, + msgID: id, + payLoad: payload, + } - consumerMsg := ConsumerMessage{ - Message: msg, - Consumer: pc, - } + consumerMsg := ConsumerMessage{ + Message: msg, + Consumer: pc, + } - select { - case pc.subQueue <- consumerMsg: - // Add messageId to Overflow buffer, avoiding duplicates. - newMid := response.GetMessageId() - var dup bool - - pc.omu.Lock() - for _, mid := range pc.overflow { - if proto.Equal(mid, newMid) { - dup = true - break + select { + case pc.subQueue <- consumerMsg: + //Add messageId to redeliverMessages buffer, avoiding duplicates. + newMid := response.GetMessageId() + var dup bool + + pc.omu.Lock() + for _, mid := range pc.redeliverMessages { + if proto.Equal(mid, newMid) { + dup = true + break + } } - } - if !dup { - pc.overflow = append(pc.overflow, newMid) + if !dup { + pc.redeliverMessages = append(pc.redeliverMessages, newMid) + } + pc.omu.Unlock() + continue + default: + return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel)) } - pc.omu.Unlock() - return nil - default: - return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel)) } + + return nil } type handleAck struct { diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go index 98e2156..e00c1e1 100644 --- a/pulsar/impl_partition_producer.go +++ b/pulsar/impl_partition_producer.go @@ -34,7 +34,7 @@ import ( type producerState int const ( - producerInit = iota + producerInit producerState = iota producerReady producerClosing producerClosed @@ -249,7 +249,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { sequenceID := internal.GetAndAdd(p.sequenceIDGenerator, 1) if sendAsBatch { - for p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) == false { + ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters) + if ok == false { // The current batch is full.. flush it and retry p.internalFlushCurrentBatch() } @@ -321,13 +322,25 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) erro func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) { p.publishSemaphore.Acquire() - p.eventsChan <- &sendRequest{ctx, msg, callback, false} + sr := &sendRequest{ + ctx: ctx, + msg: msg, + callback: callback, + flushImmediately: false, + } + p.eventsChan <- sr } func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { p.publishSemaphore.Acquire() - p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately} + sr := &sendRequest{ + ctx: ctx, + msg: msg, + callback: callback, + flushImmediately: flushImmediately, + } + p.eventsChan <- sr } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 04545c1..1f29f7f 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -87,7 +87,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [ wb.Write(payload) } -func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payload []byte, err error) { +func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloadList [][]byte, err error) { // reusable buffer for 4-byte uint32s buf32 := make([]byte, 4) r := bytes.NewReader(headersAndPayload) @@ -164,33 +164,63 @@ func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, payloa return nil, nil, err } + numMsg := msgMeta.GetNumMessagesInBatch() + + if numMsg > 0 && msgMeta.NumMessagesInBatch != nil { + payloads := make([]byte, lr.N) + if _, err = io.ReadFull(lr, payloads); err != nil { + return nil, nil, err + } + + singleMessages, err := decodeBatchPayload(payloads, numMsg) + if err != nil { + return nil, nil, err + } + + payloadList = make([][]byte, 0, numMsg) + for _, singleMsg := range singleMessages { + msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey + msgMeta.Properties = singleMsg.SingleMeta.Properties + msgMeta.EventTime = singleMsg.SingleMeta.EventTime + payloadList = append(payloadList, singleMsg.SinglePayload) + } + + if err := computeChecksum(chksum, expectedChksum); err != nil { + return nil, nil, err + } + return msgMeta, payloadList, nil + } // Anything left in the frame is considered // the payload and can be any sequence of bytes. - payloads := make([]byte, lr.N) - if _, err = io.ReadFull(lr, payloads); err != nil { - return nil, nil, err - } + payloadList = make([][]byte, 0, 10) + if lr.N > 0 { + // guard against allocating large buffer + if lr.N > MaxFrameSize { + return nil, nil, fmt.Errorf("frame payload size (%d) cannot be greater than max frame size (%d)", lr.N, MaxFrameSize) + } - numMsg := msgMeta.GetNumMessagesInBatch() + payload := make([]byte, lr.N) + if _, err = io.ReadFull(lr, payload); err != nil { + return nil, nil, err + } - singleMessages, err := decodeBatchPayload(payloads, numMsg) - if err != nil { - return nil, nil, err + payloadList = append(payloadList, payload) } - for _, singleMsg := range singleMessages { - payload = singleMsg.SinglePayload - msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey - msgMeta.Properties = singleMsg.SingleMeta.Properties - msgMeta.EventTime = singleMsg.SingleMeta.EventTime + if err := computeChecksum(chksum, expectedChksum); err != nil { + return nil, nil, err } - if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) { - return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) does "+ + return msgMeta, payloadList, nil +} + +func computeChecksum(chksum CheckSum, expectedChksum []byte) error { + computed := chksum.compute() + if !bytes.Equal(computed, expectedChksum) { + return fmt.Errorf("checksum mismatch: computed (0x%X) does "+ "not match given checksum (0x%X)", computed, expectedChksum) } - - return msgMeta, payload, nil + return nil } func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, payload []byte) { @@ -252,7 +282,7 @@ type singleMessage struct { func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) { buf32 := make([]byte, 4) rdBuf := bytes.NewReader(bp) - list := make([]*singleMessage, 0, batchNum) + singleMsgList := make([]*singleMessage, 0, batchNum) for i := int32(0); i < batchNum; i++ { // singleMetaSize if _, err := io.ReadFull(rdBuf, buf32); err != nil { @@ -274,13 +304,15 @@ func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) { if _, err := io.ReadFull(rdBuf, singlePayload); err != nil { return nil, err } - d := &singleMessage{} - d.SingleMetaSize = singleMetaSize - d.SingleMeta = singleMeta - d.SinglePayload = singlePayload - list = append(list, d) + singleMsg := &singleMessage{ + SingleMetaSize: singleMetaSize, + SingleMeta: singleMeta, + SinglePayload: singlePayload, + } + + singleMsgList = append(singleMsgList, singleMsg) } - return list, nil + return singleMsgList, nil } // ConvertFromStringMap convert a string map to a KeyValue []byte diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index d83f30b..2c707ea 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -64,7 +64,7 @@ type Connection interface { } type ConsumerHandler interface { - HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error + MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) error } type connectionState int @@ -131,7 +131,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO incomingRequests: make(chan *request), writeRequests: make(chan []byte), listeners: make(map[uint64]ConnectionListener), - connWrapper: NewConnWrapper(), + connWrapper: NewConnWrapper(), } cnx.reader = newConnectionReader(cnx) cnx.cond = sync.NewCond(cnx) @@ -307,7 +307,7 @@ func (c *connection) writeCommand(cmd proto.Message) { func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []byte) { c.log.Debugf("Received command: %s -- payload: %v", cmd, headersAndPayload) c.lastDataReceivedTime = time.Now() - var err error + var err error switch *cmd.Type { case pb.BaseCommand_SUCCESS: @@ -344,7 +344,7 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by case pb.BaseCommand_SEND_ERROR: case pb.BaseCommand_MESSAGE: - err = c.handleMessage(cmd.GetMessage(), headersAndPayload) + err = c.handleMessage(cmd.GetMessage(), headersAndPayload) case pb.BaseCommand_PING: c.handlePing() case pb.BaseCommand_PONG: @@ -353,9 +353,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE: default: - if err != nil { - c.log.Errorf("Received invalid command type: %s", cmd.Type) - } + if err != nil { + c.log.Errorf("Received invalid command type: %s", cmd.Type) + } c.Close() } } @@ -403,18 +403,18 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { } func (c *connection) handleMessage(response *pb.CommandMessage, payload []byte) error { - c.log.Debug("Got Message: ", response) - consumerId := response.GetConsumerId() - if consumer, ok := c.connWrapper.Consumers[consumerId]; ok { - err := consumer.HandlerMessage(response, payload) - if err != nil { - c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId) - return errors.New("handler not found") - } - } else { - c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId) - } - return nil + c.log.Debug("Got Message: ", response) + consumerId := response.GetConsumerId() + if consumer, ok := c.connWrapper.Consumers[consumerId]; ok { + err := consumer.MessageReceived(response, payload) + if err != nil { + c.log.WithField("consumerId", consumerId).Error("handle message err: ", response.MessageId) + return errors.New("handler not found") + } + } else { + c.log.WithField("consumerId", consumerId).Warn("Got unexpected message: ", response.MessageId) + } + return nil } func (c *connection) sendPing() { @@ -522,8 +522,8 @@ func (c *connection) getTLSConfig() (*tls.Config, error) { } type ConnWrapper struct { - Rwmu sync.RWMutex - Consumers map[uint64]ConsumerHandler + Rwmu sync.RWMutex + Consumers map[uint64]ConsumerHandler } func NewConnWrapper() *ConnWrapper { @@ -533,13 +533,13 @@ func NewConnWrapper() *ConnWrapper { } func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) { - c.connWrapper.Rwmu.Lock() - c.connWrapper.Consumers[id] = handler - c.connWrapper.Rwmu.Unlock() + c.connWrapper.Rwmu.Lock() + c.connWrapper.Consumers[id] = handler + c.connWrapper.Rwmu.Unlock() } func (c *connection) DeleteConsumeHandler(id uint64) { - c.connWrapper.Rwmu.Lock() - delete(c.connWrapper.Consumers, id) - c.connWrapper.Rwmu.Unlock() + c.connWrapper.Rwmu.Lock() + delete(c.connWrapper.Consumers, id) + c.connWrapper.Rwmu.Unlock() } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 66683d0..e9327d0 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "net/http" "sync" "testing" "time" @@ -30,6 +31,55 @@ import ( log "github.com/sirupsen/logrus" ) +func TestInvalidURL(t *testing.T) { + client, err := NewClient(ClientOptions{}) + + if client != nil || err == nil { + t.Fatal("Should have failed to create client") + } +} + +func TestProducerConnectError(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://invalid-hostname:6650", + }) + + assert.Nil(t, err) + + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + // Expect error in creating producer + assert.Nil(t, producer) + assert.NotNil(t, err) + + assert.Equal(t, err.Error(), "connection error") +} + +func TestProducerNoTopic(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + if err != nil { + t.Fatal(err) + return + } + + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{}) + + // Expect error in creating producer + assert.Nil(t, producer) + assert.NotNil(t, err) + + assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName) +} + func TestSimpleProducer(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, @@ -92,7 +142,8 @@ func TestProducerAsyncSend(t *testing.T) { assert.NoError(t, err) } - producer.Flush() + err = producer.Flush() + assert.Nil(t, err) wg.Wait() @@ -181,6 +232,213 @@ func TestProducerLastSequenceID(t *testing.T) { assert.NoError(t, err) } +func TestEventTime(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := "test-event-time" + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "subName", + }) + assert.Nil(t, err) + defer consumer.Close() + + eventTime := timeFromUnixTimestampMillis(uint64(1565161612)) + err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("test-event-time")), + EventTime: &eventTime, + }) + assert.Nil(t, err) + + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + actualEventTime := msg.EventTime() + assert.Equal(t, eventTime.Unix(), actualEventTime.Unix()) +} + +func TestFlushInProducer(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := "test-flush-in-producer" + subName := "subscription-name" + numOfMessages := 10 + ctx := context.Background() + + // set batch message number numOfMessages, and max delay 10s + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + BatchingMaxMessages: uint(numOfMessages), + BatchingMaxPublishDelay: time.Second * 10, + BlockIfQueueFull: true, + Properties: map[string]string{ + "producer-name": "test-producer-name", + "producer-id": "test-producer-id", + }, + }) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + }) + assert.Nil(t, err) + defer consumer.Close() + + prefix := "msg-batch-async" + msgCount := 0 + + wg := sync.WaitGroup{} + wg.Add(5) + errors := util.NewBlockingQueue(10) + for i := 0; i < numOfMessages/2; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(messageContent), + }, func(id MessageID, producerMessage *ProducerMessage, e error) { + if e != nil { + log.WithError(e).Error("Failed to publish") + errors.Put(e) + } else { + log.Info("Published message ", id) + } + wg.Done() + }) + assert.Nil(t, err) + } + err = producer.Flush() + assert.Nil(t, err) + wg.Wait() + + for i := 0; i < numOfMessages/2; i++ { + _, err := consumer.Receive(ctx) + assert.Nil(t, err) + msgCount++ + } + + assert.Equal(t, msgCount, numOfMessages/2) + + wg.Add(5) + for i := numOfMessages / 2; i < numOfMessages; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(messageContent), + }, func(id MessageID, producerMessage *ProducerMessage, e error) { + if e != nil { + log.WithError(e).Error("Failed to publish") + errors.Put(e) + } else { + log.Info("Published message ", id) + } + wg.Done() + }) + assert.Nil(t, err) + } + + err = producer.Flush() + assert.Nil(t, err) + wg.Wait() + + for i := numOfMessages / 2; i < numOfMessages; i++ { + _, err := consumer.Receive(ctx) + assert.Nil(t, err) + msgCount++ + } + assert.Equal(t, msgCount, numOfMessages) +} + +func TestFlushInPartitionedProducer(t *testing.T) { + topicName := "persistent://public/default/partition-testFlushInPartitionedProducer" + + // call admin api to make it partitioned + url := adminURL + "/" + "admin/v2/" + topicName + "/partitions" + makeHTTPCall(t, http.MethodPut, url, "5") + + numberOfPartitions := 5 + numOfMessages := 10 + ctx := context.Background() + + // creat client connection + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer and set batch message number numOfMessages, and max delay 10s + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: false, + BatchingMaxMessages: uint(numOfMessages / numberOfPartitions), + BatchingMaxPublishDelay: time.Second * 10, + BlockIfQueueFull: true, + }) + defer producer.Close() + + // send 5 messages + prefix := "msg-batch-async-" + wg := sync.WaitGroup{} + wg.Add(5) + errors := util.NewBlockingQueue(5) + for i := 0; i < numOfMessages/2; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + producer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte(messageContent), + }, func(id MessageID, producerMessage *ProducerMessage, e error) { + if e != nil { + log.WithError(e).Error("Failed to publish") + errors.Put(e) + } else { + log.Info("Published message: ", id) + } + wg.Done() + }) + assert.Nil(t, err) + } + + // After flush, should be able to consume. + err = producer.Flush() + assert.Nil(t, err) + + wg.Wait() + + // Receive all messages + msgCount := 0 + for i := 0; i < numOfMessages/2; i++ { + msg, err := consumer.Receive(ctx) + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), string(msg.Payload())) + assert.Nil(t, err) + err = consumer.Ack(msg) + assert.Nil(t, err) + msgCount++ + } + assert.Equal(t, msgCount, numOfMessages/2) +} + func TestMessageRouter(t *testing.T) { // Create topic with 5 partitions httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions", 5) diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go index 8ec51c6..c46b731 100644 --- a/pulsar/unackedMsgTracker.go +++ b/pulsar/unackedMsgTracker.go @@ -34,7 +34,6 @@ type UnackedMessageTracker struct { oldOpenSet set.Set timeout *time.Ticker - pc *partitionConsumer pcs []*partitionConsumer } @@ -159,22 +158,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) { t.oldOpenSet.Clear() - if t.pc != nil { - requestID := t.pc.client.rpcClient.NewRequestID() - cmd := &pb.CommandRedeliverUnacknowledgedMessages{ - ConsumerId: proto.Uint64(t.pc.consumerID), - MessageIds: messageIds, - } - - _, err := t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID, - pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd) - if err != nil { - t.pc.log.WithError(err).Error("Failed to unsubscribe consumer") - return - } - - log.Debugf("consumer:%v redeliver messages num:%d", t.pc.consumerName, len(messageIds)) - } else if t.pcs != nil { + if t.pcs != nil { messageIdsMap := make(map[int32][]*pb.MessageIdData) for _, msgID := range messageIds { messageIdsMap[msgID.GetPartition()] = append(messageIdsMap[msgID.GetPartition()], msgID) @@ -198,7 +182,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis int64) { } } } - log.Debug("Tick at ", tick) + log.Debugf("Tick at: %v", tick) } t.toggle() diff --git a/util/util.go b/util/util.go index 06a7e53..bd4f5d6 100644 --- a/util/util.go +++ b/util/util.go @@ -18,15 +18,27 @@ package util import ( - `reflect` + "reflect" ) // IsNil check if the interface is nil func IsNil(i interface{}) bool { - vi := reflect.ValueOf(i) - if vi.Kind() == reflect.Ptr { - return vi.IsNil() - } - return false + vi := reflect.ValueOf(i) + if vi.Kind() == reflect.Ptr { + return vi.IsNil() + } + return false } +// RemoveDuplicateElement remove repeating elements from the string slice +func RemoveDuplicateElement(addrs []string) []string { + result := make([]string, 0, len(addrs)) + temp := map[string]struct{}{} + for _, item := range addrs { + if _, ok := temp[item]; !ok { + temp[item] = struct{}{} + result = append(result, item) + } + } + return result +} diff --git a/util/util_test.go b/util/util_test.go index 2e1195c..284dd0c 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -18,14 +18,23 @@ package util import ( - `github.com/stretchr/testify/assert` - `testing` + "fmt" + "github.com/stretchr/testify/assert" + "strings" + "testing" ) func TestIsNil(t *testing.T) { - var a interface{} = nil - var b interface{} = (*int)(nil) + var a interface{} = nil + var b interface{} = (*int)(nil) - assert.True(t, a == nil) - assert.False(t, b == nil) + assert.True(t, a == nil) + assert.False(t, b == nil) +} + +func TestRemoveDuplicateElement(t *testing.T) { + s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"} + resList := RemoveDuplicateElement(s) + res := fmt.Sprintf("%s", resList[:]) + assert.Equal(t, 1, strings.Count(res, "hello")) }