BewareMyPower commented on code in PR #1237:
URL: https://github.com/apache/pulsar-client-go/pull/1237#discussion_r1673377675
##########
pulsar/transaction_coordinator_client.go:
##########
@@ -20,23 +20,322 @@ package pulsar
import (
"context"
"strconv"
+ "strings"
"sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/pkg/errors"
+ uAtomic "go.uber.org/atomic"
"google.golang.org/protobuf/proto"
)
type transactionCoordinatorClient struct {
client *client
- cons []internal.Connection
+ handlers []*transactionHandler
epoch uint64
semaphore internal.Semaphore
log log.Logger
}
+type transactionHandler struct {
+ tc *transactionCoordinatorClient
+ state uAtomic.Int32
+ conn uAtomic.Value
+ partition uint64
+ closeCh chan any
+ requestCh chan any
+ connectClosedCh chan *connectionClosed
+ log log.Logger
+}
+
+type txnHandlerState int
+
+const (
+ txnHandlerReady = iota
+ txnHandlerClosed
+)
+
+func (t *transactionHandler) getState() txnHandlerState {
+ return txnHandlerState(t.state.Load())
+}
+
+func (t *transactionHandler) setState(state txnHandlerState) {
+ t.state.Store(int32(state))
+}
+
+func (tc *transactionCoordinatorClient) newTransactionHandler(partition
uint64) (*transactionHandler, error) {
+ handler := &transactionHandler{
+ tc: tc,
+ partition: partition,
+ closeCh: make(chan any),
+ requestCh: make(chan any),
+ connectClosedCh: make(chan *connectionClosed),
+ log: tc.log.SubLogger(log.Fields{"txn handler
partition": partition}),
+ }
+ err := handler.grabConn()
+ if err != nil {
+ return nil, err
+ }
+ go handler.runEventsLoop()
+ return handler, nil
+}
+
+func (t *transactionHandler) grabConn() error {
+ lr, err :=
t.tc.client.lookupService.Lookup(getTCAssignTopicName(t.partition))
+ if err != nil {
+ t.log.WithError(err).Warn("Failed to lookup the
transaction_impl " +
+ "coordinator assign topic [" +
strconv.FormatUint(t.partition, 10) + "]")
+ return err
+ }
+
+ requestID := t.tc.client.rpcClient.NewRequestID()
+ cmdTCConnect := pb.CommandTcClientConnectRequest{
+ RequestId: proto.Uint64(requestID),
+ TcId: proto.Uint64(t.partition),
+ }
+
+ res, err := t.tc.client.rpcClient.Request(lr.LogicalAddr,
lr.PhysicalAddr, requestID,
+ pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)
+
+ if err != nil {
+ t.log.WithError(err).Error("Failed to connect transaction_impl
coordinator " +
+ strconv.FormatUint(t.partition, 10))
+ return err
+ }
+
+ go func() {
+ select {
+ case <-t.closeCh:
+ return
+ case <-res.Cnx.WaitForClose():
+ t.connectClosedCh <- &connectionClosed{}
+ }
+ }()
+ t.conn.Store(res.Cnx)
+ t.log.Infof("Transaction handler with transaction coordinator id %d
connected", t.partition)
+ return nil
+}
+
+func (t *transactionHandler) getConn() internal.Connection {
+ return t.conn.Load().(internal.Connection)
+}
+
+func (t *transactionHandler) runEventsLoop() {
+ for {
+ select {
+ case <-t.closeCh:
+ return
+ case req := <-t.requestCh:
+ switch r := req.(type) {
+ case *newTxnOp:
+ t.newTransaction(r)
+ case *addPublishPartitionOp:
+ t.addPublishPartitionToTxn(r)
+ case *addSubscriptionOp:
+ t.addSubscriptionToTxn(r)
+ case *endTxnOp:
+ t.endTxn(r)
+ }
+ case <-t.connectClosedCh:
+ t.log.Infof("Transaction handler %d will reconnect to
the transaction coordinator", t.partition)
+ t.reconnectToBroker()
+ }
+ }
+}
+
+func (t *transactionHandler) reconnectToBroker() {
+ var delayReconnectTime time.Duration
+ var defaultBackoff = internal.DefaultBackoff{}
+
+ for {
+ if t.getState() == txnHandlerClosed {
+ // The handler is already closing
+ t.log.Info("transaction handler is closed, exit
reconnect")
+ return
+ }
+
+ delayReconnectTime = defaultBackoff.Next()
+
+ t.log.WithFields(log.Fields{
+ "delayReconnectTime": delayReconnectTime,
+ }).Info("Reconnecting to broker")
+ time.Sleep(delayReconnectTime)
+
+ // double check
+ if t.getState() == txnHandlerClosed {
+ // Txn handler is already closing
+ t.log.Info("transaction handler is closed, exit
reconnect")
+ return
+ }
+
+ err := t.grabConn()
+ if err == nil {
+ // Successfully reconnected
+ t.log.Info("Reconnected transaction handler to broker")
+ return
+ }
+ t.log.WithError(err).Error("Failed to create transaction
handler at reconnect")
+ errMsg := err.Error()
+ if strings.Contains(errMsg, errMsgTopicNotFound) {
+ // when topic is deleted, we should give up
reconnection.
+ t.log.Warn("Topic Not Found.")
+ break
+ }
+ }
+}
+
+func (t *transactionHandler) checkRetriableError(err error, op any) bool {
+ if err != nil && errors.Is(err, internal.ErrConnectionClosed) {
+ // We are in the EventLoop here, so we need to insert the
request back to the requestCh asynchronously.
+ go func() {
+ t.requestCh <- op
+ }()
+ return true
+ }
+ return false
+}
+
+type newTxnOp struct {
+ // Request
+ timeout time.Duration
+
+ // Response
+ done chan any
+ err error
+ txnID *TxnID
+}
+
+func (t *transactionHandler) newTransaction(op *newTxnOp) {
+ requestID := t.tc.client.rpcClient.NewRequestID()
+ nextTcID := t.tc.nextTCNumber()
+ cmdNewTxn := &pb.CommandNewTxn{
+ RequestId: proto.Uint64(requestID),
+ TcId: proto.Uint64(nextTcID),
+ TxnTtlSeconds: proto.Uint64(uint64(op.timeout.Milliseconds())),
+ }
+ res, err := t.tc.client.rpcClient.RequestOnCnx(t.getConn(), requestID,
pb.BaseCommand_NEW_TXN, cmdNewTxn)
+ if t.checkRetriableError(err, op) {
+ return
+ }
+ defer close(op.done)
+ if err != nil {
+ op.err = err
+ } else if res.Response.NewTxnResponse.Error != nil {
+ op.err =
getErrorFromServerError(res.Response.NewTxnResponse.Error)
+ } else {
+ op.txnID = &TxnID{*res.Response.NewTxnResponse.TxnidMostBits,
+ *res.Response.NewTxnResponse.TxnidLeastBits}
+ }
+}
+
+type addPublishPartitionOp struct {
+ // Request
+ id *TxnID
+ partitions []string
+
+ // Response
+ done chan any
+ err error
Review Comment:
Why not use `chan error` as the response? Then you can replace
`close(op.done)` with
```go
if err != nil {
op.errorCh <- err
} else if res.Response.AddPartitionToTxnResponse.Error != nil {
op.errorCh <-
getErrorFromServerError(res.Response.AddPartitionToTxnResponse.Error)
} else {
op.errorCh <- nil
}
```
and you just need to return the value from the channel:
```go
return <-op.errorCh
```
The same question is applied for other `xxxOp` structs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]