plhwin opened a new issue #381: [native] bug for multiple producer instances in 
a same process
URL: https://github.com/apache/rocketmq-client-go/issues/381
 
 
   Please tell us about your environment:
        - What is your OS? macOS Catalina 10.15.2
   
        - What is your client version? rocketmq-client-go branch v2.0.0-rc1 
(branch native got the same result)
   
        - What is your RocketMQ version?  V4_6_0
   
   I simulated a typical quote sending scenario in a FinTech scenario: receive 
multiple liquidity data at the same time (each liquidity has multiple different 
symbols), and then use liquidity + symbol as the sharding key to send to 
TopicTest. the topic of TopicTest has 16 queues.
   
   the program simulation:
   1. a separate producer instance for each liquidity
   2. each liquidity sends partition sequence message to rocketmq in a separate 
goroutine
   
   but, it seem some error happend with SendOneWay(or SendSync) methods, the 
error was: 「 send error: the topic=TopicTest route info not found」
   
   here is the example code below:
   
   
   `package main
   
   import (
        "context"
        "flag"
        "log"
        "os"
        "strconv"
        "strings"
        "sync"
        "time"
   
        "github.com/apache/rocketmq-client-go"
        "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/producer"
   )
   
   var (
        topic     = flag.String("topic", "TopicTest", "producer topic")
        group     = flag.String("group", "testGroupProducer", "producer group")
        liquidity = flag.String("liquidity", "L1,L2", "liquidity")
        symbol    = flag.String("symbol", "S1", "symbol")
        ns        = flag.String("ns", "127.0.0.1:49876", "producer nameServer")
        limit     = flag.Int("limit", 10000, "limit on the number of data sent 
by each symbol under each liquidity")
        producers map[string]rocketmq.Producer
   )
   
   func init() {
        flag.Parse()
        producers = make(map[string]rocketmq.Producer)
   }
   
   // each liquidity corresponds to a producer instance
   func initProducers() {
        liquidities := strings.Split(*liquidity, ",")
        for i, liquidity := range liquidities {
                var err error
                producers[liquidity], err = rocketmq.NewProducer(
                        producer.WithNameServer(strings.Split(*ns, ",")),
                        producer.WithGroupName(*group),
                        //producer.WithRetry(2),
                        //producer.WithCredentials(primitive.Credentials{
                        //      AccessKey: "RocketMQ",
                        //      SecretKey: "12345678",
                        //}),
                )
                if err != nil {
                        log.Println("init producer error:", liquidity, 
err.Error())
                        os.Exit(0)
                }
                if err = producers[liquidity].Start(); err != nil {
                        log.Println("start producer error:", liquidity, 
err.Error())
                        os.Exit(1)
                }
                log.Println("start producer success:", liquidity, i)
        }
   }
   
   var (
        wg sync.WaitGroup
   )
   
   func main() {
        initProducers()
        log.Println("==========start==========")
   
        // each liquidity corresponds to a goroutine to send data
        for liquidity, _ := range producers {
                wg.Add(1)
                // some symbols belong to this liquidity
                symbols := strings.Split(*symbol, ",")
                go liquidityStart(liquidity, symbols)
        }
        wg.Wait() // wait for background goroutine to finish
   }
   
   func liquidityStart(liquidity string, symbols []string) {
        defer func() {
                wg.Done()
                if err := producers[liquidity].Shutdown(); err != nil {
                        log.Println("shutdown error:", liquidity, err)
                }
        }()
        for _, symbol := range symbols {
                sendOrderly(liquidity, symbol)
        }
   }
   
   // send mock data
   func sendOrderly(liquidity, symbol string) {
        for i := 0; i < *limit; i++ {
                // millisecond
                timeNow := time.Now()
                millisecond := timeNow.UnixNano() / int64(time.Millisecond)
                liquidity := liquidity
                sequence := strconv.Itoa(i)
                timestamp := strconv.Itoa(int(millisecond))
                bidQueue := 
"0.88099@1000000;0.88098@750000;0.88097@1500000;0.88096@2000000;0.88096@2000000"
                askQueue := 
"0.88101@600000;0.88103@1050000;0.88104@500000;0.88105@2250000;0.88106@4500000"
                message := liquidity + "|" + symbol + "|" + sequence + "|" + 
timestamp + "|" + bidQueue + "|" + askQueue
   
                m1 := &primitive.Message{
                        Topic: *topic,
                        Body:  []byte(message),
                }
                shardingKey := liquidity + symbol
                m1.WithShardingKey(shardingKey)
                err := producers[liquidity].SendOneWay(context.Background(), m1)
                if err != nil {
                        log.Println("send error:", err, liquidity, symbol, i)
                } else {
                        log.Println("send success:", liquidity, symbol, i, 
sequence)
                }
        }
   }
   `
   
   
   the program accepts the parameters like below:
   go run main.go -limit=10 -liquidity=L1,L2,L3 -symbol=S1   
   
   the execution results of some different parameters are shown below
   
   
![image](https://user-images.githubusercontent.com/1219345/72038885-f783e980-32dd-11ea-9032-383eb729b743.png)
   `go run main.go -limit=10 -liquidity=L1 -symbol=S1 `
   with only one liquidity L1, it mean only one producer instance, the results 
are as expected, all sent successfully
   
   
   
![image](https://user-images.githubusercontent.com/1219345/72038969-47fb4700-32de-11ea-808e-152ef5b6ac83.png)
   `go run main.go -limit=10 -liquidity=L1,L2 -symbol=S1`
   
   with two producer instance, only L2 success, L1 all failed.
   
   
![image](https://user-images.githubusercontent.com/1219345/72039012-6c572380-32de-11ea-955c-6d824fabc491.png)
   `go run main.go -limit=10 -liquidity=L1,L2,L3 -symbol=S1`
   with 3 producer instance, only L3 success, L1 and L2 all failed.
   
   in summary, always the last producer instance will succeed
   
   
![image](https://user-images.githubusercontent.com/1219345/72039069-a7f1ed80-32de-11ea-9f5b-dc204a0f8043.png)
   `go run main.go -limit=10 -liquidity=L1 -symbol=S1,S2,S3`
   with 1 liquidity producer instance + 3 symbol under this liquidity, all 
success
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to