smiletrl edited a comment on issue #668:
URL: 
https://github.com/apache/rocketmq-client-go/issues/668#issuecomment-871201564


   @ThreeBearsDan  This project kind of moves pretty slow on new pull request.
   
   Also, sometimes it's a bad idea to depend on the mq transaction message 
directly. For example, if mq is temporarily down, some of the most critical 
business features are down, just because the mq transaction message has failed 
to be sent. For example, the order service will be totally broken because mq 
message can not send order paid/canceled event transaction message. Or the 
transaction message content somehow depends on the local transaction committed 
data.
   
   Here's  one solution to make use of the transaction message.
   
   To keep strong consistency, instead of moving all the business logic into 
the transaction listener's `ExecuteLocalTransaction`, the business logic keeps 
in the original place. When local transaction has succeeded, the mq message can 
subscribe to the `local database change` to send a transaction message. For 
example,  if order's status has changed from `pending_payment` to 
`success_paid`, then a new mq message should be sent out.
   
   It might be too much work to write a subscription system to subscribe to 
database binlog change. So the idea is to add a middle table as 
`mq_pending_messages` as
   
   ```
   id, tag, topic, property, created_at
   
   1, order_paid, order, {order_id: 23, user_id: 12}, 2021-02-12
   ```
    
   When local transaction has made the order paid change, include a new record 
in above table within this local transaction change. Column `property` is a 
json field to hold the message fields to be sent out. The mq transaction 
message will include the extra id value in this table, maybe as `pending_id` 
field.
   
   Then send a transaction message based on the record from above table. It 
will delete above record in transaction message's `ExecuteLocalTransaction`, 
and `CheckLocalTransaction` will look for this middle table and see 
`pending_id` inside this message has been deleted from this table 
`mq_pending_messages`.
   
   Here's the code sample
   
   ```
   func (c creation) defaultAfterSuccessPayment() Invoker {
        return func(ctx Context, order *entity.Order) error {
                // other local transaction
                // ...
   
                // now set order status to be paid
                status := constants.GetOrderPaidStatus(c.Method())
                tx := ctx.TX()
                if err := tx.Model(entity.Order{}).
                        Where("id = ?", order.ID).
                        Update("status", status).Error; err != nil {
                        return errors.Wrapf(err, "error updating order: %d 
status to be status: %s after success payment", order.ID, status)
                }
   
                // create a new record in table `mq_pending_message`
                property := OrderPaidMessageProperty{
                        OrderID:       order.ID,
                        PaymentMethod: ctx.PaymentMethod(),
                        UserID:        ctx.UserID(),
                        Amount:        order.ActualAmount,
                }
   
                bytes, err := json.Marshal(ctx)
                if err != nil {
                        return errors.Wrapf(err, "error marshal mq pending 
message property in order creation with %v", property)
                }
   
                mm := entity.MqPendingMessage{
                        Tag:       constants.RocketMQTagOrderPaid.String(),
                        Property:  bytes,
                        Topic:     "API",
                        CreatedAt: time.Now(),
                }
                if err := tx.Create(&mm).Error; err != nil {
                        return errors.Wrapf(err, "error creation mq pending 
message in order creation with %v", mm)
                }
   
                // commit transaction
                if err := tx.Commit().Error; err != nil {
                        return errors.Wrapf(err, "error committing in order 
creation with user id: %d", ctx.UserID())
                }
   
                // send transaction message here
                msg, err := rocketmq.NewMessage(ctx.Context()).
                        SetInt64("pendind_id", mm.ID).
                        SetInt64("order_id", order.ID).
                        SetString("payment_method", ctx.PaymentMethod()).
                        SetInt64("user_id", ctx.UserID()).
                        SetInt("amount", order.ActualAmount).
                        Encode(constants.RocketMQTagPaymentSucceed)
                if err != nil {
                        logger.Logger.Errorf("order creation: error creating 
message with order id: %d and err: %v", order.ID, err)
                        return nil
                }
                producer := MQProducers[constants.RocketMQProducerGroupPayment]
                if _, err = producer.SendMessageInTransaction(ctx.Context(), 
msg); err != nil {
                        logger.Logger.Errorf("order creation: error sending 
message with order id: %d and err: %v", order.ID, err)
                        return nil
                }
                return nil
        }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to