smiletrl edited a comment on issue #668:
URL:
https://github.com/apache/rocketmq-client-go/issues/668#issuecomment-871201564
@ThreeBearsDan This project kind of moves pretty slow on new pull request.
Also, sometimes it's a bad idea to depend on the mq transaction message
directly. For example, if mq is temporarily down, some of the most critical
business features are down, just because the mq transaction message has failed
to be sent. For example, the order service will be totally broken because mq
message can not send order paid/canceled event transaction message. Or the
transaction message content somehow depends on the local transaction committed
data.
Here's one solution to make use of the transaction message.
To keep strong consistency, instead of moving all the business logic into
the transaction listener's `ExecuteLocalTransaction`, the business logic keeps
in the original place. When local transaction has succeeded, the mq message can
subscribe to the `local database change` to send a transaction message. For
example, if order's status has changed from `pending_payment` to
`success_paid`, then a new mq message should be sent out.
It might be too much work to write a subscription system to subscribe to
database binlog change. So the idea is to add a middle table as
`mq_pending_messages` as
```
id, tag, topic, property, created_at
1, order_paid, order, {order_id: 23, user_id: 12}, 2021-02-12
```
When local transaction has made the order paid change, include a new record
in above table within this local transaction change. Column `property` is a
json field to hold the message fields to be sent out. The mq transaction
message will include the extra id value in this table, maybe as `pending_id`
field.
Then send a transaction message based on the record from above table. It
will delete above record in transaction message's `ExecuteLocalTransaction`,
and `CheckLocalTransaction` will look for this middle table and see
`pending_id` inside this message has been deleted from this table
`mq_pending_messages`.
Here's the code sample
```
func (c creation) defaultAfterSuccessPayment() Invoker {
return func(ctx Context, order *entity.Order) error {
// other local transaction
// ...
// now set order status to be paid
status := constants.GetOrderPaidStatus(c.Method())
tx := ctx.TX()
if err := tx.Model(entity.Order{}).
Where("id = ?", order.ID).
Update("status", status).Error; err != nil {
return errors.Wrapf(err, "error updating order: %d
status to be status: %s after success payment", order.ID, status)
}
// create a new record in table `mq_pending_message`
property := OrderPaidMessageProperty{
OrderID: order.ID,
PaymentMethod: ctx.PaymentMethod(),
UserID: ctx.UserID(),
Amount: order.ActualAmount,
}
bytes, err := json.Marshal(property)
if err != nil {
return errors.Wrapf(err, "error marshal mq pending
message property in order creation with %v", property)
}
mm := entity.MqPendingMessage{
Tag: constants.RocketMQTagOrderPaid.String(),
Property: bytes,
Topic: "API",
CreatedAt: time.Now(),
}
if err := tx.Create(&mm).Error; err != nil {
return errors.Wrapf(err, "error creation mq pending
message in order creation with %v", mm)
}
// commit transaction
if err := tx.Commit().Error; err != nil {
return errors.Wrapf(err, "error committing in order
creation with user id: %d", ctx.UserID())
}
// send transaction message here
msg, err := rocketmq.NewMessage(ctx.Context()).
SetInt64("pendind_id", mm.ID).
SetInt64("order_id", order.ID).
SetString("payment_method", ctx.PaymentMethod()).
SetInt64("user_id", ctx.UserID()).
SetInt("amount", order.ActualAmount).
Encode(constants.RocketMQTagPaymentSucceed)
if err != nil {
logger.Logger.Errorf("order creation: error creating
message with order id: %d and err: %v", order.ID, err)
return nil
}
producer := MQProducers[constants.RocketMQProducerGroupPayment]
if _, err = producer.SendMessageInTransaction(ctx.Context(),
msg); err != nil {
logger.Logger.Errorf("order creation: error sending
message with order id: %d and err: %v", order.ID, err)
return nil
}
return nil
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]