This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2437caa [feat][txn]Implement transactional consumer/producer API
(#1002)
2437caa is described below
commit 2437caa14fb0e2f4ab34eaee08e9a83612244d0f
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 6 09:53:12 2023 +0800
[feat][txn]Implement transactional consumer/producer API (#1002)
* [feat][txn]Implement transactional consumer/producer API
* rollback
* fix test
* optimize code
* state check
* fix callback
* golangci-lint
* add more test
---
pulsar/client.go | 11 +
pulsar/client_impl.go | 8 +
pulsar/consumer.go | 3 +
pulsar/consumer_impl.go | 10 +
pulsar/consumer_multitopic.go | 17 ++
pulsar/consumer_partition.go | 174 +++++++++++----
pulsar/consumer_regex.go | 18 ++
pulsar/helper_for_test.go | 2 +-
pulsar/internal/batch_builder.go | 8 +
pulsar/internal/commands.go | 12 +-
pulsar/internal/http_client.go | 4 +-
pulsar/internal/key_based_batch_builder.go | 5 +-
.../pulsartracing/consumer_interceptor_test.go | 4 +
pulsar/message.go | 4 +
pulsar/producer_partition.go | 110 ++++++++--
pulsar/transaction.go | 8 +-
pulsar/transaction_coordinator_client.go | 18 +-
pulsar/transaction_impl.go | 2 +
pulsar/transaction_test.go | 240 ++++++++++++++++++++-
19 files changed, 573 insertions(+), 85 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index bc3f4f5..7e6725d 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -184,6 +184,17 @@ type Client interface {
// {@link Consumer} or {@link Producer} instances directly on a
particular partition.
TopicPartitions(topic string) ([]string, error)
+ // NewTransaction creates a new Transaction instance.
+ //
+ // This function is used to initiate a new transaction for performing
+ // atomic operations on the message broker. It returns a Transaction
+ // object that can be used to produce, consume and commit messages in a
+ // transactional manner.
+ //
+ // In case of any errors while creating the transaction, an error will
+ // be returned.
+ NewTransaction(duration time.Duration) (Transaction, error)
+
// Close Closes the Client and free associated resources
Close()
}
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5322597..9b44987 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -195,6 +195,14 @@ func newClient(options ClientOptions) (Client, error) {
return c, nil
}
+func (c *client) NewTransaction(timeout time.Duration) (Transaction, error) {
+ id, err := c.tcClient.newTransaction(timeout)
+ if err != nil {
+ return nil, err
+ }
+ return newTransaction(*id, c.tcClient, timeout), nil
+}
+
func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
producer, err := newProducer(c, &options)
if err == nil {
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 64a096d..3ef72c7 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -265,6 +265,9 @@ type Consumer interface {
// AckID the consumption of a single message, identified by its
MessageID
AckID(MessageID) error
+ // AckWithTxn the consumption of a single message with a transaction
+ AckWithTxn(Message, Transaction) error
+
// AckCumulative the reception of all the messages in the stream up to
(and including)
// the provided message.
AckCumulative(msg Message) error
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index fd7fa57..07f38c3 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -38,6 +38,7 @@ type acker interface {
// AckID does not handle errors returned by the Broker side, so no need
to wait for doneCh to finish.
AckID(id MessageID) error
AckIDWithResponse(id MessageID) error
+ AckIDWithTxn(msgID MessageID, txn Transaction) error
AckIDCumulative(msgID MessageID) error
AckIDWithResponseCumulative(msgID MessageID) error
NackID(id MessageID)
@@ -478,6 +479,15 @@ func (c *consumer) Receive(ctx context.Context) (message
Message, err error) {
}
}
+func (c *consumer) AckWithTxn(msg Message, txn Transaction) error {
+ msgID := msg.ID()
+ if err := c.checkMsgIDPartition(msgID); err != nil {
+ return err
+ }
+
+ return c.consumers[msgID.PartitionIdx()].AckIDWithTxn(msgID, txn)
+}
+
// Chan return the message chan to users
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 8108c29..f6630dd 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -143,6 +143,23 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
return mid.consumer.AckID(msgID)
}
+// AckWithTxn the consumption of a single message with a transaction
+func (c *multiTopicConsumer) AckWithTxn(msg Message, txn Transaction) error {
+ msgID := msg.ID()
+ if !checkMessageIDType(msgID) {
+ c.log.Warnf("invalid message id type %T", msgID)
+ return errors.New("invalid message id type in multi_consumer")
+ }
+ mid := toTrackingMessageID(msgID)
+
+ if mid.consumer == nil {
+ c.log.Warnf("unable to ack messageID=%+v can not determine
topic", msgID)
+ return errors.New("unable to ack message because consumer is
nil")
+ }
+
+ return mid.consumer.AckIDWithTxn(msgID, txn)
+}
+
// AckCumulative the reception of all the messages in the stream up to (and
including)
// the provided message
func (c *multiTopicConsumer) AckCumulative(msg Message) error {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a8ba7b5..6e241d8 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -417,6 +417,118 @@ func (pc *partitionConsumer) Unsubscribe() error {
return req.err
}
+// ackIDCommon handles common logic for acknowledging messages with or without
transactions.
+// withTxn should be set to true when dealing with transactions.
+func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool,
txn Transaction) error {
+ if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
+ return errors.New("consumer state is closed")
+ }
+
+ if cmid, ok := msgID.(*chunkMessageID); ok {
+ return pc.unAckChunksTracker.ack(cmid)
+ }
+
+ trackingID := toTrackingMessageID(msgID)
+
+ if trackingID != nil && trackingID.ack() {
+ // All messages in the same batch have been acknowledged, we
only need to acknowledge the
+ // MessageID that represents the entry that stores the whole
batch
+ trackingID = &trackingMessageID{
+ messageID: &messageID{
+ ledgerID: trackingID.ledgerID,
+ entryID: trackingID.entryID,
+ },
+ }
+ pc.metrics.AcksCounter.Inc()
+
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
/ 1.0e9)
+ } else if !pc.options.enableBatchIndexAck {
+ return nil
+ }
+
+ var err error
+ if withResponse {
+ if txn != nil {
+ ackReq := pc.sendIndividualAckWithTxn(trackingID,
txn.(*transaction))
+ <-ackReq.doneCh
+ err = ackReq.err
+ } else {
+ ackReq := pc.sendIndividualAck(trackingID)
+ <-ackReq.doneCh
+ err = ackReq.err
+ }
+ } else {
+ pc.ackGroupingTracker.add(trackingID)
+ }
+ pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
+ return err
+}
+
+// AckIDWithTxn acknowledges the consumption of a message with transaction.
+func (pc *partitionConsumer) AckIDWithTxn(msgID MessageID, txn Transaction)
error {
+ return pc.ackIDCommon(msgID, true, txn)
+}
+
+// ackID acknowledges the consumption of a message and optionally waits for
response from the broker.
+func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
+ return pc.ackIDCommon(msgID, withResponse, nil)
+}
+
+func (pc *partitionConsumer) internalAckWithTxn(req *ackWithTxnRequest) {
+ defer close(req.doneCh)
+ if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
+ req.err = newError(ConsumerClosed, "Failed to ack by closing or
closed consumer")
+ return
+ }
+ if req.Transaction.state != TxnOpen {
+ pc.log.WithField("state", req.Transaction.state).Error("Failed
to ack by a non-open transaction.")
+ req.err = newError(InvalidStatus, "Failed to ack by a non-open
transaction.")
+ return
+ }
+ msgID := req.msgID
+
+ messageIDs := make([]*pb.MessageIdData, 1)
+ messageIDs[0] = &pb.MessageIdData{
+ LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
+ EntryId: proto.Uint64(uint64(msgID.entryID)),
+ }
+ if pc.options.enableBatchIndexAck && msgID.tracker != nil {
+ ackSet := msgID.tracker.toAckSet()
+ if ackSet != nil {
+ messageIDs[0].AckSet = ackSet
+ }
+ }
+
+ reqID := pc.client.rpcClient.NewRequestID()
+ txnID := req.Transaction.GetTxnID()
+ cmdAck := &pb.CommandAck{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: messageIDs,
+ AckType: pb.CommandAck_Individual.Enum(),
+ TxnidMostBits: proto.Uint64(txnID.MostSigBits),
+ TxnidLeastBits: proto.Uint64(txnID.LeastSigBits),
+ }
+
+ if err := req.Transaction.registerAckTopic(pc.options.topic,
pc.options.subscription); err != nil {
+ req.err = err
+ return
+ }
+
+ if err := req.Transaction.registerSendOrAckOp(); err != nil {
+ req.err = err
+ return
+ }
+
+ cmdAck.RequestId = proto.Uint64(reqID)
+ _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID,
pb.BaseCommand_ACK, cmdAck)
+ if err != nil {
+ pc.log.WithError(err).Error("Ack with response error")
+ }
+ req.Transaction.endSendOrAckOp(err)
+ req.err = err
+}
+
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
@@ -488,47 +600,6 @@ func (pc *partitionConsumer) requestGetLastMessageID()
(*trackingMessageID, erro
return convertToMessageID(id), nil
}
-func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
- if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
- pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
- return errors.New("consumer state is closed")
- }
-
- if cmid, ok := msgID.(*chunkMessageID); ok {
- return pc.unAckChunksTracker.ack(cmid)
- }
-
- trackingID := toTrackingMessageID(msgID)
-
- if trackingID != nil && trackingID.ack() {
- // All messages in the same batch have been acknowledged, we
only need to acknowledge the
- // MessageID that represents the entry that stores the whole
batch
- trackingID = &trackingMessageID{
- messageID: &messageID{
- ledgerID: trackingID.ledgerID,
- entryID: trackingID.entryID,
- },
- }
- pc.metrics.AcksCounter.Inc()
-
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
/ 1.0e9)
- } else if !pc.options.enableBatchIndexAck {
- return nil
- }
-
- var ackReq *ackRequest
- if withResponse {
- ackReq := pc.sendIndividualAck(trackingID)
- <-ackReq.doneCh
- } else {
- pc.ackGroupingTracker.add(trackingID)
- }
- pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
- if ackReq == nil {
- return nil
- }
- return ackReq.err
-}
-
func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
ackReq := &ackRequest{
doneCh: make(chan struct{}),
@@ -539,6 +610,17 @@ func (pc *partitionConsumer) sendIndividualAck(msgID
MessageID) *ackRequest {
return ackReq
}
+func (pc *partitionConsumer) sendIndividualAckWithTxn(msgID MessageID, txn
*transaction) *ackWithTxnRequest {
+ ackReq := &ackWithTxnRequest{
+ Transaction: txn,
+ doneCh: make(chan struct{}),
+ ackType: individualAck,
+ msgID: *msgID.(*trackingMessageID),
+ }
+ pc.eventsCh <- ackReq
+ return ackReq
+}
+
func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
if !checkMessageIDType(msgID) {
pc.log.Errorf("invalid message id type %T", msgID)
@@ -1389,6 +1471,14 @@ type ackRequest struct {
err error
}
+type ackWithTxnRequest struct {
+ doneCh chan struct{}
+ msgID trackingMessageID
+ Transaction *transaction
+ ackType int
+ err error
+}
+
type unsubscribeRequest struct {
doneCh chan struct{}
err error
@@ -1444,6 +1534,8 @@ func (pc *partitionConsumer) runEventsLoop() {
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
+ case *ackWithTxnRequest:
+ pc.internalAckWithTxn(v)
case []*pb.MessageIdData:
pc.internalAckList(v)
case *redeliveryRequest:
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 2520af5..79e2293 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -193,6 +193,24 @@ func (c *regexConsumer) AckID(msgID MessageID) error {
return mid.consumer.AckID(msgID)
}
+// AckID the consumption of a single message, identified by its MessageID
+func (c *regexConsumer) AckWithTxn(msg Message, txn Transaction) error {
+ msgID := msg.ID()
+ if !checkMessageIDType(msgID) {
+ c.log.Warnf("invalid message id type %T", msgID)
+ return fmt.Errorf("invalid message id type %T", msgID)
+ }
+
+ mid := toTrackingMessageID(msgID)
+
+ if mid.consumer == nil {
+ c.log.Warnf("unable to ack messageID=%+v can not determine
topic", msgID)
+ return errors.New("consumer is nil in consumer_regex")
+ }
+
+ return mid.consumer.AckIDWithTxn(msgID, txn)
+}
+
// AckCumulative the reception of all the messages in the stream up to (and
including)
// the provided message.
func (c *regexConsumer) AckCumulative(msg Message) error {
diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go
index 7bbf66e..426855b 100644
--- a/pulsar/helper_for_test.go
+++ b/pulsar/helper_for_test.go
@@ -159,7 +159,7 @@ func topicStats(topic string) (map[string]interface{},
error) {
func transactionStats(id *TxnID) (map[string]interface{}, error) {
var metadata map[string]interface{}
- path := fmt.Sprintf("admin/v3/transactions/transactionMetadata/%d/%d",
id.mostSigBits, id.leastSigBits)
+ path := fmt.Sprintf("admin/v3/transactions/transactionMetadata/%d/%d",
id.MostSigBits, id.LeastSigBits)
err := httpGet(path, &metadata)
return metadata, err
}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 649aba4..6df3a61 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -51,6 +51,9 @@ type BatchBuilder interface {
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
+ useTxn bool,
+ mostSigBits uint64,
+ leastSigBits uint64,
) bool
// Flush all the messages buffered in the client and wait until all
messages have been successfully persisted.
@@ -185,6 +188,7 @@ func (bc *batchContainer) Add(
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
+ useTxn bool, mostSigBits uint64, leastSigBits uint64,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
@@ -223,6 +227,10 @@ func (bc *batchContainer) Add(
}
bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
+ if useTxn {
+ bc.cmdSend.Send.TxnidMostBits =
proto.Uint64(mostSigBits)
+ bc.cmdSend.Send.TxnidLeastBits =
proto.Uint64(leastSigBits)
+ }
}
addSingleMessageToBatch(bc.buffer, metadata, payload)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 00e075b..7471ee0 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -22,11 +22,10 @@ import (
"errors"
"fmt"
- "google.golang.org/protobuf/proto"
-
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -332,7 +331,10 @@ func SingleSend(wb Buffer,
msgMetadata *pb.MessageMetadata,
compressedPayload Buffer,
encryptor crypto.Encryptor,
- maxMassageSize uint32) error {
+ maxMassageSize uint32,
+ useTxn bool,
+ mostSigBits uint64,
+ leastSigBits uint64) error {
cmdSend := baseCommand(
pb.BaseCommand_SEND,
&pb.CommandSend{
@@ -344,6 +346,10 @@ func SingleSend(wb Buffer,
isChunk := true
cmdSend.Send.IsChunk = &isChunk
}
+ if useTxn {
+ cmdSend.Send.TxnidMostBits = proto.Uint64(mostSigBits)
+ cmdSend.Send.TxnidLeastBits = proto.Uint64(leastSigBits)
+ }
// payload has been compressed so compressionProvider can be nil
return serializeMessage(wb, cmdSend, msgMetadata, compressedPayload,
nil, encryptor, maxMassageSize, false)
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index c3b38f2..2d440d6 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -190,7 +190,9 @@ func (c *httpClient) GetWithOptions(endpoint string, obj
interface{}, params map
req.params = query
}
- resp, err := checkSuccessful(c.doRequest(req))
+ doRequest, err := c.doRequest(req)
+ defer safeRespClose(doRequest)
+ resp, err := checkSuccessful(doRequest, err)
if err != nil {
return nil, err
}
diff --git a/pulsar/internal/key_based_batch_builder.go
b/pulsar/internal/key_based_batch_builder.go
index 334e674..88a4d5e 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -132,6 +132,9 @@ func (bc *keyBasedBatchContainer) Add(
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
+ useTxn bool,
+ mostSigBits uint64,
+ leastSigBits uint64,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set
the replication clusters,
@@ -162,7 +165,7 @@ func (bc *keyBasedBatchContainer) Add(
add := batchPart.Add(
metadata, sequenceIDGenerator, payload, callback, replicateTo,
deliverAt,
- schemaVersion, multiSchemaEnabled,
+ schemaVersion, multiSchemaEnabled, useTxn, mostSigBits,
leastSigBits,
)
if !add {
return false
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
index 34e09d5..06c9a58 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -52,6 +52,10 @@ func (c *mockConsumer) Subscription() string {
return ""
}
+func (c *mockConsumer) AckWithTxn(msg pulsar.Message, txn pulsar.Transaction)
error {
+ return nil
+}
+
func (c *mockConsumer) Unsubscribe() error {
return nil
}
diff --git a/pulsar/message.go b/pulsar/message.go
index 98190e9..83afd3f 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -69,6 +69,10 @@ type ProducerMessage struct {
//Schema assign to the current message
//Note: messages may have a different schema from producer schema, use
it instead of producer schema when assigned
Schema Schema
+
+ //Transaction assign to the current message
+ //Note: The message is not visible before the transaction is committed.
+ Transaction Transaction
}
// Message abstraction used in Pulsar
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 744df79..e77c929 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -663,6 +663,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
+ transaction: request.transaction,
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
@@ -681,16 +682,16 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
} else {
smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled := !p.options.DisableMultiSchema
- added := p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
- msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled)
+ added := addRequestToBatch(smm, p, uncompressedPayload,
request, msg, deliverAt, schemaVersion,
+ multiSchemaEnabled)
if !added {
- // The current batch is full.. flush it and retry
+ // The current batch is full. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
- if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
- msg.ReplicationClusters, deliverAt,
schemaVersion, multiSchemaEnabled); !ok {
+ if ok := addRequestToBatch(smm, p, uncompressedPayload,
request, msg, deliverAt, schemaVersion,
+ multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg,
errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
@@ -707,6 +708,23 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
}
+func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
+ uncompressedPayload []byte,
+ request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
+ schemaVersion []byte, multiSchemaEnabled bool) bool {
+ var ok bool
+ if request.transaction != nil {
+ txnID := request.transaction.GetTxnID()
+ ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
+ msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled, true, txnID.MostSigBits,
+ txnID.LeastSigBits)
+ } else {
+ ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
+ msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled, false, 0, 0)
+ }
+ return ok
+}
+
func (p *partitionProducer) genMetadata(msg *ProducerMessage,
uncompressedSize int,
deliverAt time.Time) (mm *pb.MessageMetadata) {
@@ -789,16 +807,36 @@ func (p *partitionProducer) internalSingleSend(mm
*pb.MessageMetadata,
}
sid := *mm.SequenceId
-
- if err := internal.SingleSend(
- buffer,
- p.producerID,
- sid,
- mm,
- payloadBuf,
- p.encryptor,
- maxMessageSize,
- ); err != nil {
+ var err error
+ if request.transaction != nil {
+ txnID := request.transaction.GetTxnID()
+ err = internal.SingleSend(
+ buffer,
+ p.producerID,
+ sid,
+ mm,
+ payloadBuf,
+ p.encryptor,
+ maxMessageSize,
+ true,
+ txnID.MostSigBits,
+ txnID.LeastSigBits,
+ )
+ } else {
+ err = internal.SingleSend(
+ buffer,
+ p.producerID,
+ sid,
+ mm,
+ payloadBuf,
+ p.encryptor,
+ maxMessageSize,
+ false,
+ 0,
+ 0,
+ )
+ }
+ if err != nil {
request.callback(nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
p.log.WithError(err).Errorf("Single message serialize failed
%s", msg.Value)
@@ -952,6 +990,9 @@ func (p *partitionProducer) failTimeoutMessages() {
sr.callback(nil, sr.msg,
errSendTimeout)
})
}
+ if sr.transaction != nil {
+ sr.transaction.endSendOrAckOp(nil)
+ }
}
// flag the send has completed with error, flush make
no effect
@@ -1067,9 +1108,34 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+ //Register transaction operation to transaction and the transaction
coordinator.
+ var newCallback func(MessageID, *ProducerMessage, error)
+ if msg.Transaction != nil {
+ transactionImpl := (msg.Transaction).(*transaction)
+ if transactionImpl.state != TxnOpen {
+ p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
+ " by a non-open transaction.")
+ callback(nil, msg, newError(InvalidStatus, "Failed to
send message by a non-open transaction."))
+ return
+ }
+
+ if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
+ callback(nil, msg, err)
+ return
+ }
+ if err := transactionImpl.registerSendOrAckOp(); err != nil {
+ callback(nil, msg, err)
+ }
+ newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
+ callback(id, producerMessage, err)
+ transactionImpl.endSendOrAckOp(err)
+ }
+ } else {
+ newCallback = callback
+ }
if p.getProducerState() != producerReady {
// Producer is closing
- callback(nil, msg, errProducerClosed)
+ newCallback(nil, msg, errProducerClosed)
return
}
@@ -1078,16 +1144,20 @@ func (p *partitionProducer) internalSendAsync(ctx
context.Context, msg *Producer
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
-
+ var txn *transaction
+ if msg.Transaction != nil {
+ txn = (msg.Transaction).(*transaction)
+ }
sr := &sendRequest{
ctx: ctx,
msg: msg,
- callback: callback,
+ callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
+ transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
@@ -1191,6 +1261,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
+ if sr.transaction != nil {
+ sr.transaction.endSendOrAckOp(nil)
+ }
}
// Mark this pending item as done
@@ -1287,6 +1360,7 @@ type sendRequest struct {
chunkID int
uuid string
chunkRecorder *chunkRecorder
+ transaction *transaction
}
// stopBlock can be invoked multiple times safety
diff --git a/pulsar/transaction.go b/pulsar/transaction.go
index 60e1d2b..944c7e3 100644
--- a/pulsar/transaction.go
+++ b/pulsar/transaction.go
@@ -49,10 +49,10 @@ const (
// TxnID An identifier for representing a transaction.
type TxnID struct {
- // mostSigBits The most significant 64 bits of this TxnID.
- mostSigBits uint64
- // leastSigBits The least significant 64 bits of this TxnID.
- leastSigBits uint64
+ // MostSigBits The most significant 64 bits of this TxnID.
+ MostSigBits uint64
+ // LeastSigBits The least significant 64 bits of this TxnID.
+ LeastSigBits uint64
}
// Transaction used to guarantee exactly-once
diff --git a/pulsar/transaction_coordinator_client.go
b/pulsar/transaction_coordinator_client.go
index 1535fad..96cca87 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -135,11 +135,11 @@ func (tc *transactionCoordinatorClient)
addPublishPartitionToTxn(id *TxnID, part
requestID := tc.client.rpcClient.NewRequestID()
cmdAddPartitions := &pb.CommandAddPartitionToTxn{
RequestId: proto.Uint64(requestID),
- TxnidMostBits: proto.Uint64(id.mostSigBits),
- TxnidLeastBits: proto.Uint64(id.leastSigBits),
+ TxnidMostBits: proto.Uint64(id.MostSigBits),
+ TxnidLeastBits: proto.Uint64(id.LeastSigBits),
Partitions: partitions,
}
- res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.MostSigBits],
requestID,
pb.BaseCommand_ADD_PARTITION_TO_TXN, cmdAddPartitions)
tc.semaphore.Release()
if err != nil {
@@ -163,11 +163,11 @@ func (tc *transactionCoordinatorClient)
addSubscriptionToTxn(id *TxnID, topic st
}
cmdAddSubscription := &pb.CommandAddSubscriptionToTxn{
RequestId: proto.Uint64(requestID),
- TxnidMostBits: proto.Uint64(id.mostSigBits),
- TxnidLeastBits: proto.Uint64(id.leastSigBits),
+ TxnidMostBits: proto.Uint64(id.MostSigBits),
+ TxnidLeastBits: proto.Uint64(id.LeastSigBits),
Subscription: []*pb.Subscription{sub},
}
- res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID,
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.MostSigBits],
requestID,
pb.BaseCommand_ADD_SUBSCRIPTION_TO_TXN, cmdAddSubscription)
tc.semaphore.Release()
if err != nil {
@@ -187,10 +187,10 @@ func (tc *transactionCoordinatorClient) endTxn(id *TxnID,
action pb.TxnAction) e
cmdEndTxn := &pb.CommandEndTxn{
RequestId: proto.Uint64(requestID),
TxnAction: &action,
- TxnidMostBits: proto.Uint64(id.mostSigBits),
- TxnidLeastBits: proto.Uint64(id.leastSigBits),
+ TxnidMostBits: proto.Uint64(id.MostSigBits),
+ TxnidLeastBits: proto.Uint64(id.LeastSigBits),
}
- res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.mostSigBits],
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
+ res, err := tc.client.rpcClient.RequestOnCnx(tc.cons[id.MostSigBits],
requestID, pb.BaseCommand_END_TXN, cmdEndTxn)
tc.semaphore.Release()
if err != nil {
return err
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
index 7cc93ec..8a24c46 100644
--- a/pulsar/transaction_impl.go
+++ b/pulsar/transaction_impl.go
@@ -231,6 +231,8 @@ func (state TxnState) string() string {
return "TxnAborted"
case TxnTimeout:
return "TxnTimeout"
+ case TxnError:
+ return "TxnError"
default:
return "Unknown"
}
diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go
index 362e4d2..66a82cc 100644
--- a/pulsar/transaction_test.go
+++ b/pulsar/transaction_test.go
@@ -103,8 +103,8 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
//The operations of committing txn1 should success at the first time
and fail at the second time.
txn1 := createTxn(tc, t)
err := txn1.Commit(context.Background())
- require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction
%d:%d\n", txn1.txnID.mostSigBits,
- txn1.txnID.leastSigBits))
+ require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction
%d:%d\n", txn1.txnID.MostSigBits,
+ txn1.txnID.LeastSigBits))
txn1.state = TxnOpen
txn1.opsFlow <- true
err = txn1.Commit(context.Background())
@@ -117,7 +117,7 @@ func TestTxnImplCommitOrAbort(t *testing.T) {
txn2 := newTransaction(*id2, tc, time.Hour)
err = txn2.Abort(context.Background())
require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction
%d:%d\n",
- id2.mostSigBits, id2.leastSigBits))
+ id2.MostSigBits, id2.LeastSigBits))
txn2.state = TxnOpen
txn2.opsFlow <- true
err = txn2.Abort(context.Background())
@@ -209,11 +209,237 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
URL: webServiceURLTLS,
TLSTrustCertsFilePath: caCertsPath,
Authentication: NewAuthenticationTLS(tlsClientCertPath,
tlsClientKeyPath),
+ EnableTransaction: true,
})
require.Nil(t, err, "Failed to create client.")
- tcClient := newTransactionCoordinatorClientImpl(c.(*client))
- err = tcClient.start()
- require.Nil(t, err, "Failed to start transaction coordinator.")
- return tcClient, c.(*client)
+ return c.(*client).tcClient, c.(*client)
+}
+
+// TestConsumeAndProduceWithTxn is a test function that validates the behavior
of producing and consuming
+// messages with and without transactions. It consists of the following steps:
+//
+// 1. Prepare: Create a PulsarClient and initialize the transaction
coordinator client.
+// 2. Prepare: Create a topic and a subscription.
+// 3. Produce 10 messages with a transaction and 10 messages without a
transaction.
+// - Expectation: The consumer should be able to receive the 10 messages sent
without a transaction,
+// but not the 10 messages sent with the transaction.
+// 4. Commit the transaction and receive the remaining 10 messages.
+// - Expectation: The consumer should be able to receive the 10 messages sent
with the transaction.
+// 5. Clean up: Close the consumer and producer instances.
+//
+// The test ensures that the consumer can only receive messages sent with a
transaction after it is committed,
+// and that it can always receive messages sent without a transaction.
+func TestConsumeAndProduceWithTxn(t *testing.T) {
+ // Step 1: Prepare - Create PulsarClient and initialize the transaction
coordinator client.
+ topic := newTopicName()
+ sub := "my-sub"
+ _, client := createTcClient(t)
+ // Step 2: Prepare - Create Topic and Subscription.
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ assert.NoError(t, err)
+ producer, _ := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ SendTimeout: 0,
+ })
+ // Step 3: Open a transaction, send 10 messages with the transaction
and 10 messages without the transaction.
+ // Expectation: We can receive the 10 messages sent without a
transaction and
+ // cannot receive the 10 messages sent with the transaction.
+ txn, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ for i := 0; i < 10; i++ {
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1024),
+ })
+ require.Nil(t, err)
+ }
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Transaction: txn,
+ Payload: make([]byte, 1024),
+ })
+ require.Nil(t, err)
+ }
+ // Attempt to receive and acknowledge the 10 messages sent without a
transaction.
+ for i := 0; i < 10; i++ {
+ msg, _ := consumer.Receive(context.Background())
+ assert.NotNil(t, msg)
+ err = consumer.Ack(msg)
+ assert.Nil(t, err)
+ }
+ // Create a goroutine to attempt receiving a message and send it to the
'done1' channel.
+ done1 := make(chan Message)
+ go func() {
+ msg, _ := consumer.Receive(context.Background())
+ err := consumer.AckID(msg.ID())
+ require.Nil(t, err)
+ close(done1)
+ }()
+ // Expectation: The consumer should not receive uncommitted messages.
+ select {
+ case <-done1:
+ require.Fail(t, "The consumer should not receive uncommitted
message")
+ case <-time.After(time.Second):
+ }
+ // Step 4: After committing the transaction, we should be able to
receive the remaining 10 messages.
+ // Acknowledge the rest of the 10 messages with the transaction.
+ // Expectation: After committing the transaction, all messages of the
subscription will be acknowledged.
+ _ = txn.Commit(context.Background())
+ txn, err = client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+ for i := 0; i < 9; i++ {
+ msg, _ := consumer.Receive(context.Background())
+ require.NotNil(t, msg)
+ err = consumer.AckWithTxn(msg, txn)
+ require.Nil(t, err)
+ }
+ _ = txn.Commit(context.Background())
+ <-done1
+ consumer.Close()
+ consumer, _ = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ // Create a goroutine to attempt receiving a message and send it to the
'done1' channel.
+ done2 := make(chan Message)
+ go func() {
+ consumer.Receive(context.Background())
+ close(done2)
+ }()
+
+ // Expectation: The consumer should not receive uncommitted messages.
+ select {
+ case <-done2:
+ require.Fail(t, "The consumer should not receive messages")
+ case <-time.After(time.Second):
+ }
+
+ // Step 5: Clean up - Close the consumer and producer instances.
+ consumer.Close()
+ producer.Close()
+}
+
+func TestAckAndSendWithTxn(t *testing.T) {
+ // Prepare: Create PulsarClient and initialize the transaction
coordinator client.
+ sourceTopic := newTopicName()
+ sinkTopic := newTopicName()
+ sub := "my-sub"
+ _, client := createTcClient(t)
+
+ // Prepare: Create source and sink topics and subscriptions.
+ sourceConsumer, _ := client.Subscribe(ConsumerOptions{
+ Topic: sourceTopic,
+ SubscriptionName: sub,
+ })
+ sourceProducer, _ := client.CreateProducer(ProducerOptions{
+ Topic: sourceTopic,
+ SendTimeout: 0,
+ })
+ sinkConsumer, _ := client.Subscribe(ConsumerOptions{
+ Topic: sinkTopic,
+ SubscriptionName: sub,
+ })
+ sinkProducer, _ := client.CreateProducer(ProducerOptions{
+ Topic: sinkTopic,
+ SendTimeout: 0,
+ })
+
+ // Produce 10 messages to the source topic.
+ for i := 0; i < 10; i++ {
+ _, err := sourceProducer.Send(context.Background(),
&ProducerMessage{
+ Payload: make([]byte, 1024),
+ })
+ require.Nil(t, err)
+ }
+
+ // Open a transaction and consume messages from the source topic while
sending messages to the sink topic.
+ txn, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+
+ for i := 0; i < 10; i++ {
+ msg, _ := sourceConsumer.Receive(context.Background())
+ require.NotNil(t, msg)
+
+ payload := msg.Payload()
+ _, err := sinkProducer.Send(context.Background(),
&ProducerMessage{
+ Transaction: txn,
+ Payload: payload,
+ })
+ require.Nil(t, err)
+
+ err = sourceConsumer.AckWithTxn(msg, txn)
+ require.Nil(t, err)
+ }
+
+ // Commit the transaction.
+ _ = txn.Commit(context.Background())
+
+ // Verify that the messages are available in the sink topic.
+ for i := 0; i < 10; i++ {
+ msg, _ := sinkConsumer.Receive(context.Background())
+ require.NotNil(t, msg)
+ err = sinkConsumer.Ack(msg)
+ require.Nil(t, err)
+ }
+
+ // Clean up: Close consumers and producers.
+ sourceConsumer.Close()
+ sourceProducer.Close()
+ sinkConsumer.Close()
+ sinkProducer.Close()
+}
+
+func TestTransactionAbort(t *testing.T) {
+ // Prepare: Create PulsarClient and initialize the transaction
coordinator client.
+ topic := newTopicName()
+ sub := "my-sub"
+ _, client := createTcClient(t)
+
+ // Prepare: Create Topic and Subscription.
+ consumer, _ := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sub,
+ })
+ producer, _ := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ SendTimeout: 0,
+ })
+
+ // Open a transaction and send 10 messages with the transaction.
+ txn, err := client.NewTransaction(time.Hour)
+ require.Nil(t, err)
+
+ for i := 0; i < 10; i++ {
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Transaction: txn,
+ Payload: make([]byte, 1024),
+ })
+ require.Nil(t, err)
+ }
+
+ // Abort the transaction.
+ _ = txn.Abort(context.Background())
+
+ // Expectation: The consumer should not receive any messages.
+ done := make(chan struct{})
+ go func() {
+ _, err := consumer.Receive(context.Background())
+ if err != nil {
+ close(done)
+ }
+ }()
+
+ select {
+ case <-done:
+ // The consumer should not receive any messages.
+ require.Fail(t, "The consumer should not receive any messages")
+ case <-time.After(time.Second):
+ }
+
+ // Clean up: Close the consumer and producer instances.
+ consumer.Close()
+ producer.Close()
}