This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git
The following commit(s) were added to refs/heads/master by this push:
new cf7335b add go e2e test case&BDD description example (#11)
cf7335b is described below
commit cf7335b3fc1e9bd74b17af1bd81f9f0a31f4e491
Author: rook1ewang <[email protected]>
AuthorDate: Tue Mar 14 15:00:33 2023 +0800
add go e2e test case&BDD description example (#11)
Co-authored-by: ww269266 <[email protected]>
---
.gitignore | 3 +-
bdd/README-CN.md | 33 +++
bdd/README.md | 35 +++
bdd/normal.feature | 18 ++
.gitignore => common/.gitignore | 3 +-
common/bin/env.sh | 46 ++++
common/bin/mqadmin | 45 ++++
common/bin/tools.sh | 43 ++++
common/pom.xml | 75 +++++++
common/release.xml | 55 +++++
golang/.gitignore | 27 +++
golang/README.md | 55 +++--
golang/bin/run.sh | 24 +++
golang/client_test.go | 22 --
golang/main.go | 22 ++
golang/mqgotest/delay/delaymsg_test.go | 125 +++++++++++
golang/mqgotest/fifo/ordermsg_test.go | 75 +++++++
golang/mqgotest/normal/normalmsg_test.go | 118 ++++++++++
golang/mqgotest/transaction/transmsg_test.go | 91 ++++++++
golang/utils/CheckUtils.go | 77 +++++++
golang/utils/ClientUtils.go | 307 +++++++++++++++++++++++++++
golang/utils/MQAdminUtils.go | 154 ++++++++++++++
golang/utils/MsgCheck.go | 46 ++++
golang/utils/NameUtils.go | 41 ++++
24 files changed, 1495 insertions(+), 45 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8912d5b..7593935 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@ bazel-bin
bazel-rocketmq
bazel-testlogs
.vscode
-*.py
\ No newline at end of file
+*.py
+/rocketmq-admintools/
diff --git a/bdd/README-CN.md b/bdd/README-CN.md
new file mode 100644
index 0000000..3f08067
--- /dev/null
+++ b/bdd/README-CN.md
@@ -0,0 +1,33 @@
+# 背景
+
+RocketMQ5.0
客户端SDK的目标是提供各种主流编程语言的原生实现,不可避免的要考虑如何解决各个语言客户端的代码质量问题。尽管各编程语言的客户端实现形式不同,但是用户行为表现是一致的。以RocketMQ5.0
Java客户端为例,通过暴露给用户的接口可以看到,用户实际使用中涉及的class及method有限。基于以下两点,提出了一个语言无关的各编程语言客户端E2E测试方法:
+
+- 用户行为在各语言下结果一致,E2E测试场景通过对暴露的有限的接口进行用户行为的合理组合,可以做到99%的用户使用场景或行为覆盖。
+- 以BDD given-when-then 方式
作为各个语言客户端的E2E测试场景的统一描述定义。可以做到E2E测试在不同编程语言SDK下统一语义。实现E2E实际编码与定义解耦。
+
+## BDD example
+```
+exmaples:
+
+Feature: Test producer send message with normal topic
+
+ Background:
+ Given Instance username、password and endpint
+
+ Scenario: send normal msgs sync
+ Given normal topic name to use
+ When create normal topic
+ And create and start producer
+ And use producer send 10 msgs
+ Then receive 10 success response from server
+
+ Scenario: send normal msgs async
+ Given normal topic name to use
+ When create normal topic
+ And create and start producer
+ And use producer async send 10 msgs
+ Then receive 10 success callback response from server
+```
+## 代码实现
+
+目前已开源java、go
E2E测试场景,欢迎大家积极共建。利用各语言原生的测试框架建立各语言的E2E测试框架及测试场景,一起提高RocketMQ5.0的客户端质量。
diff --git a/bdd/README.md b/bdd/README.md
new file mode 100644
index 0000000..38ae0ad
--- /dev/null
+++ b/bdd/README.md
@@ -0,0 +1,35 @@
+# Background
+
+The goal of the RocketMQ5.0 client SDK is to provide the implementation of all
mainstream programming languages,it is inevitable to consider how to solve
quality problem of each programming language client.
+Although the client implementation of each programming language are different,
the user behavior is consistent. Taking the RocketMQ5.0 Java client as an
example, it can be seen from the interface exposed to the user that the classes
and methods involved in the actual use for the user are limited.
+Based on the following two foundations, a language-independent E2E test
co-construction method for client of each programming language is proposed:
+
+- The results of user behavior are consistent in each language. E2E test
scenarios can cover 99% of user scenarios or behaviors through a reasonable
combination of user behaviors on the exposed limited interfaces.
+- In the BDD given-when-then way, which is used as a unified description and
definition of E2E test scenarios for each programming language client. It is
possible to achieve unified semantics of E2E tests under different programming
language SDKs. Realize the decoupling of E2E actual coding and definition.
+
+## BDD example
+```
+exmaples:
+
+Feature: Test producer send message with normal topic
+
+ Background:
+ Given Instance username、password and endpint
+
+ Scenario: send normal msgs sync
+ Given normal topic name to use
+ When create normal topic
+ And create and start producer
+ And use producer send 10 msgs
+ Then receive 10 success response from server
+
+ Scenario: send normal msgs async
+ Given normal topic name to use
+ When create normal topic
+ And create and start producer
+ And use producer async send 10 msgs
+ Then receive 10 success callback response from server
+```
+## Code implementation
+
+At present, the Java and Go E2E test scenarios have been open sourced, welcome
to co-construction. Use the test frameworks of each programming language to
establish E2E test frameworks and test scenarios in each programming language,
and improve the client quality of RocketMQ5.0 together.
diff --git a/bdd/normal.feature b/bdd/normal.feature
new file mode 100644
index 0000000..42d8134
--- /dev/null
+++ b/bdd/normal.feature
@@ -0,0 +1,18 @@
+Feature: Test producer send message with normal topic
+
+ Background:
+ Given username、password and endpoint
+
+ Scenario: send normal msgs with sync method and recv by simpleconsumer
+ Given normal topic name to use
+ When create normal topic
+ And build and start consumer、producer,consumer subscribe topic
+ And use producer sync send 10 msgs
+ Then receive 10 msgs from server by simpleconsumer
+
+ Scenario: send normal msgs with async method and recv by simpleconsumer
+ Given normal topic name to use
+ When create normal topic
+ And build and start consumer、producer,consumer subscribe topic
+ And use producer async send 10 msgs
+ Then receive 10 msgs from server by simpleconsumer
\ No newline at end of file
diff --git a/.gitignore b/common/.gitignore
similarity index 97%
copy from .gitignore
copy to common/.gitignore
index 8912d5b..0c6711e 100644
--- a/.gitignore
+++ b/common/.gitignore
@@ -18,4 +18,5 @@ bazel-bin
bazel-rocketmq
bazel-testlogs
.vscode
-*.py
\ No newline at end of file
+*.py
+
diff --git a/common/bin/env.sh b/common/bin/env.sh
new file mode 100644
index 0000000..afe2a8c
--- /dev/null
+++ b/common/bin/env.sh
@@ -0,0 +1,46 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nameserverkey="nameserver"
+brokerkey="broker"
+proxykey="proxy"
+
+if [ -z "$ALL_IP" ] ; then
+ echo "ERROR: ENV ALL_IP not exists!!";
+ exit 1;
+ else
+ allip=(`echo $ALL_IP | tr ',' ' '`)
+ for i in "${allip[@]}"
+ do
+ if result=$(echo $i | grep "${nameserverkey}");then
+ echo "NAMESERVER=${i#*:}:9876";
+ export NAMESERVER=${i#*:}:9876 ;
+ elif result=$(echo $i | grep "${proxykey}"); then
+ # opensource 8081
+ echo "GRPC_ENDPOINT=${i#*:}:8081";
+ export GRPC_ENDPOINT=${i#*:}:8081 ;
+ elif result=$(echo $i | grep "${brokerkey}"); then
+ echo "BROKER_ADDR=${i#*:}:10911";
+ export BROKER_ADDR=${i#*:}:10911 ;
+ fi
+ done
+fi
+
+if [ -z "$CLUSTER_NAME" ] ; then
+ export CLUSTER_NAME=DefaultCluster ;
+fi
+
diff --git a/common/bin/mqadmin b/common/bin/mqadmin
new file mode 100755
index 0000000..980d6e3
--- /dev/null
+++ b/common/bin/mqadmin
@@ -0,0 +1,45 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ -z "$ROCKETMQ_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ ROCKETMQ_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
+
+ cd "$saveddir"
+fi
+
+export ROCKETMQ_HOME
+
+sh ${ROCKETMQ_HOME}/bin/tools.sh
org.apache.rocketmq.tools.command.MQAdminStartup "$@"
diff --git a/common/bin/tools.sh b/common/bin/tools.sh
new file mode 100755
index 0000000..4096f61
--- /dev/null
+++ b/common/bin/tools.sh
@@ -0,0 +1,43 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#===========================================================================================
+# Java Environment Setting
+#===========================================================================================
+error_exit ()
+{
+ echo "ERROR: $1 !!"
+ exit 1
+}
+
+
+[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
+[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
+[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME
variable in your environment, We need java(x64)!"
+
+export JAVA_HOME
+export JAVA="$JAVA_HOME/bin/java"
+export BASE_DIR=$(dirname $0)/..
+export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH}
+
+#===========================================================================================
+# JVM Configuration
+#===========================================================================================
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m
-XX:MaxMetaspaceSize=128m"
+JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
+
+$JAVA ${JAVA_OPT} "$@"
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..3501362
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-java-test</artifactId>
+ <packaging>pom</packaging>
+ <version>1.0-SNAPSHOT</version>
+
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <rocketmq.tools.version>5.0.0</rocketmq.tools.version>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.tools.version}</version>
+ </dependency>
+ </dependencies>
+
+
+ <profiles>
+ <profile>
+ <id>release</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>release</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <descriptors>
+ <descriptor>release.xml</descriptor>
+ </descriptors>
+ <appendAssemblyId>false</appendAssemblyId>
+
<outputDirectory>${project.basedir}/..</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <finalName>rocketmq-admintools</finalName>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
diff --git a/common/release.xml b/common/release.xml
new file mode 100644
index 0000000..706c451
--- /dev/null
+++ b/common/release.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<assembly>
+ <id>release</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>dir</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <includes>
+ <include>bin/**</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <unpack>false</unpack>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <scope>provided</scope>
+ </dependencySet>
+ <dependencySet>
+ <unpack>false</unpack>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <scope>system</scope>
+ </dependencySet>
+ <dependencySet>
+ <unpack>false</unpack>
+ <useProjectArtifact>true</useProjectArtifact>
+ <outputDirectory>lib</outputDirectory>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+
+</assembly>
diff --git a/golang/.gitignore b/golang/.gitignore
new file mode 100644
index 0000000..408c22e
--- /dev/null
+++ b/golang/.gitignore
@@ -0,0 +1,27 @@
+.idea/
+.DS_Store
+output/
+
+# Binaries for programs and plugins
+*.exe
+*.exe~
+*.dll
+*.so
+*.dylib
+
+# Test binary, built with `go test -c`
+*.test
+
+# Output of the go coverage tool, specifically when used with LiteIDE
+*.out
+
+# Dependency directories (remove the comment below to include it)
+# vendor/
+
+go.work
+
+
+go.mod
+go.sum
+
+cover/
\ No newline at end of file
diff --git a/golang/README.md b/golang/README.md
index c9cc7e5..41de0a2 100644
--- a/golang/README.md
+++ b/golang/README.md
@@ -1,32 +1,45 @@
## Apache RocketMQ E2E
[](https://www.apache.org/licenses/LICENSE-2.0.html)
-RocketMQ E2E Test
+RocketMQ Goland E2E Test
### Test Case Coverage
* Message Type
- * Normal message
- * Transaction message
- * Order message
- * Delay message
+ * Normal message
+ * Transaction message
+ * Order message
+ * Delay message
* Producer
- * Sync Send
- * Async Send
-* PushConsumer
+ * Sync Send
+ * Async Send
+* **PushConsumer (sdk not accomplished)**
* SimpleConsumer
- * Order/Delay/Transaction/Normal
- * Sync receive/Async receive
- * Sync ack/Async ack
-* Client init
- * Parameter settings
+ * Order/Delay/Transaction/Normal
+ * Sync receive/**Async receive (sdk not accomplished)**
+ * Sync ack/Async ack
+* Client init(Producer/SimpleConsumer/**PushConsumer (sdk not accomplished)**)
+ * Parameter settings
+* Model
+ * broadcast
+ * cluster
* Message
- * Tag
- * Body
- * Key
- * User property
+ * Tag
+ * Body
+ * Key
+ * User property
* Filter
- * Tag
- * Sql
+ * Tag
+ * Sql
* Retry
- * Normal message
- * Order message
+ * Normal message
+ * Order message
+
+#### How to start
+```angular2html
+#nameserver、endpoint and broker 、clustername was from ENV ALL_IP,You can view
the details in common/bin/env.sh
+# cd project and run go e2e test case
+cd golang && sh bin/run.sh
+```
+##### Options
+* `ALL_IP` : required, set by GitHub actions
+* `cluster`: not required, default `DefaultCluster`
diff --git a/golang/bin/run.sh b/golang/bin/run.sh
new file mode 100644
index 0000000..a1b2200
--- /dev/null
+++ b/golang/bin/run.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# cd project base dir to compile mqadmin utils for other language e2e test
using
+cd ../common && mvn -Prelease -DskipTests clean package -U
+# set env for mqadmin (use source to set linux env variables in current shell)
+cd ../rocketmq-admintools && source bin/env.sh
+# run go e2e test case
+cd ../golang && go test ./mqgotest/... -timeout 2m -v
diff --git a/golang/client_test.go b/golang/client_test.go
deleted file mode 100644
index 8edb68f..0000000
--- a/golang/client_test.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package golang
-
-import "fmt"
-
-import "testing"
-
-func TestHello(t *testing.T) {
- got := Hello()
- want := "Hello, world"
-
- if got != want {
- t.Errorf("got %q want %q", got, want)
- }
-}
-
-func Hello() string {
- return "Hello, world"
-}
-
-func main() {
- fmt.Println(Hello())
-}
diff --git a/golang/main.go b/golang/main.go
new file mode 100644
index 0000000..a673c1a
--- /dev/null
+++ b/golang/main.go
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+func main() {
+
+}
diff --git a/golang/mqgotest/delay/delaymsg_test.go
b/golang/mqgotest/delay/delaymsg_test.go
new file mode 100644
index 0000000..6ae5d8c
--- /dev/null
+++ b/golang/mqgotest/delay/delaymsg_test.go
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package delay_test
+
+import (
+ . "rocketmq-go-e2e/utils"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestDelayMsg(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ recvMsgCollector *RecvMsgsCollector
+ sendMsgCollector = NewSendMsgsCollector()
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 10
+ delaySeconds = 2
+ )
+
+ CreateDelayTopic(testTopic, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+
+ // new producer instance
+ producer := BuildProducer(grpcEndpoint, ak, sk, testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+ wg.Add(1)
+
+ go func() {
+ recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum,
invisibleDuration, int64(10+delaySeconds))
+ wg.Done()
+ }()
+ go func() {
+ for i := 0; i < msgCount; i++ {
+ var msg = BuildDelayMessage(testTopic, "test", msgtag,
time.Duration(delaySeconds), keys)
+ SendMessage(producer, msg, sendMsgCollector)
+ }
+ }()
+ wg.Wait()
+
+ CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
+
+func TestDelayMsgAsync(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ recvMsgCollector *RecvMsgsCollector
+ sendMsgCollector = NewSendMsgsCollector()
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 10
+ delaySeconds = 2
+ )
+
+ CreateDelayTopic(testTopic, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+
+ // new producer instance
+ producer := BuildProducer(grpcEndpoint, ak, sk, testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+ wg.Add(1)
+
+ go func() {
+ recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum,
invisibleDuration, int64(20+delaySeconds))
+ wg.Done()
+ }()
+ go func() {
+ for i := 0; i < msgCount; i++ {
+ var msg = BuildDelayMessage(testTopic, "test", msgtag,
time.Duration(delaySeconds), keys)
+ SendMessageAsync(producer, msg, sendMsgCollector)
+ }
+ }()
+ wg.Wait()
+
+ CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
diff --git a/golang/mqgotest/fifo/ordermsg_test.go
b/golang/mqgotest/fifo/ordermsg_test.go
new file mode 100644
index 0000000..1763e61
--- /dev/null
+++ b/golang/mqgotest/fifo/ordermsg_test.go
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package order_test
+
+import (
+ . "rocketmq-go-e2e/utils"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestFiFOMsg(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ recvMsgCollector *RecvMsgsCollector
+ sendMsgCollector = NewSendMsgsCollector()
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 256
+ )
+
+ CreateFIFOTopic(testTopic, "", clusterName, nameServer)
+ CreateOrderlyConsumerGroup(cm, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+
+ // new producer instance
+ producer := BuildProducer(grpcEndpoint, ak, sk, testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+
+ wg.Add(1)
+
+ go func() {
+ recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum,
invisibleDuration, 30)
+ wg.Done()
+ }()
+
+ go func() {
+ for i := 0; i < msgCount; i++ {
+ var msg = BuildFIFOMessage(testTopic, "test", msgtag,
cm, keys)
+ SendMessage(producer, msg, sendMsgCollector)
+ }
+ }()
+ wg.Wait()
+ CheckFIFOMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
diff --git a/golang/mqgotest/normal/normalmsg_test.go
b/golang/mqgotest/normal/normalmsg_test.go
new file mode 100644
index 0000000..90fcb4f
--- /dev/null
+++ b/golang/mqgotest/normal/normalmsg_test.go
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package normal_test
+
+import (
+ . "rocketmq-go-e2e/utils"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestNormalMsgWithMsgId(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 10
+ )
+
+ CreateTopic(testTopic, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+
+ // new producer instance
+ producer := BuildProducer(grpcEndpoint, ak, sk, testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+
+ var recvMsgCollector *RecvMsgsCollector
+ var sendMsgCollector *SendMsgsCollector
+ wg.Add(1)
+
+ go func() {
+ recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum,
invisibleDuration, 10)
+ wg.Done()
+ }()
+ go func() {
+ sendMsgCollector = SendNormalMessage(producer, testTopic,
"test", msgtag, msgCount, keys)
+ }()
+ wg.Wait()
+
+ CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
+
+func TestNormalMsgWithMsgIdAsync(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 10
+ )
+
+ CreateTopic(testTopic, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+ // new producer instance
+ producer := BuildProducer(grpcEndpoint, ak, sk, testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+
+ var recvMsgCollector *RecvMsgsCollector
+ var sendMsgCollector *SendMsgsCollector
+ wg.Add(1)
+
+ go func() {
+ recvMsgCollector = RecvMessageWithNum(simpleConsumer,
maxMessageNum, invisibleDuration, 20, msgCount)
+ wg.Done()
+ }()
+ go func() {
+ sendMsgCollector = SendNormalMessageAsync(producer, testTopic,
"test", msgtag, msgCount, keys)
+ }()
+ wg.Wait()
+
+ CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
diff --git a/golang/mqgotest/transaction/transmsg_test.go
b/golang/mqgotest/transaction/transmsg_test.go
new file mode 100644
index 0000000..b6f3f9a
--- /dev/null
+++ b/golang/mqgotest/transaction/transmsg_test.go
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package transaction_test
+
+import (
+ rmq_client "github.com/apache/rocketmq-clients/golang"
+ "log"
+ . "rocketmq-go-e2e/utils"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestTransactionMsg(t *testing.T) {
+ t.Parallel()
+ var (
+ wg sync.WaitGroup
+ recvMsgCollector *RecvMsgsCollector
+ sendMsgCollector *SendMsgsCollector
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ testTopic = GetTopicName()
+ nameServer = NAMESERVER
+ grpcEndpoint = GRPC_ENDPOINT
+ clusterName = CLUSTER_NAME
+ ak = ""
+ sk = ""
+ cm = GetGroupName()
+ msgtag = RandomString(8)
+ keys = RandomString(8)
+ msgCount = 10
+ )
+
+ wg.Add(1)
+
+ CreateTransactionTopic(testTopic, "", clusterName, nameServer)
+ simpleConsumer := BuildSimpleConsumer(grpcEndpoint, cm, msgtag, ak, sk,
testTopic)
+ // graceful stop simpleConsumer
+ defer simpleConsumer.GracefulStop()
+
+ // new producer instance
+ var checker = &rmq_client.TransactionChecker{
+ Check: func(msgView *rmq_client.MessageView)
rmq_client.TransactionResolution {
+ log.Printf("check transaction message: %v", msgView)
+ sendMsgCollector.MsgIds =
append(sendMsgCollector.MsgIds, msgView.GetMessageId())
+ msg := &rmq_client.Message{
+ Topic: msgView.GetTopic(),
+ Body: msgView.GetBody(),
+ Tag: msgView.GetTag(),
+ }
+ msg.SetKeys(msgView.GetKeys()...)
+ //msg.SetMessageGroup(*msgView.GetMessageGroup())
+ //msg.SetDelayTimestamp(*msgView.GetDeliveryTimestamp())
+ sendMsgCollector.SendMsgs =
append(sendMsgCollector.SendMsgs, msg)
+ return rmq_client.COMMIT
+ },
+ }
+
+ producer := BuildTransactionProducer(grpcEndpoint, ak, sk, checker,
testTopic)
+ // graceful stop producer
+ defer producer.GracefulStop()
+
+ go func() {
+ recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum,
invisibleDuration, 10)
+ wg.Done()
+ }()
+ go func() {
+ sendMsgCollector = SendTransactionMessage(producer, testTopic,
"test", msgtag, msgCount, keys)
+ }()
+ wg.Wait()
+
+ CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector)
+}
diff --git a/golang/utils/CheckUtils.go b/golang/utils/CheckUtils.go
new file mode 100644
index 0000000..7375491
--- /dev/null
+++ b/golang/utils/CheckUtils.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ rmq_client "github.com/apache/rocketmq-clients/golang"
+ "github.com/stretchr/testify/assert"
+ "math"
+ "sort"
+ "testing"
+ "time"
+)
+
+// check msg with msgId received only once
+func CheckMsgsWithMsgId(t *testing.T, sendMsgsCollector *SendMsgsCollector,
recvMsgsCollector *RecvMsgsCollector) {
+ assert.Equal(t, len(sendMsgsCollector.MsgIds),
len(recvMsgsCollector.MsgIds), "they should be equal")
+ sort.Strings(recvMsgsCollector.MsgIds)
+ sort.Strings(sendMsgsCollector.MsgIds)
+ assert.Equal(t, sendMsgsCollector.MsgIds, recvMsgsCollector.MsgIds,
"they should be equal")
+}
+
+// check msg with msgId tag keys body received only once
+func CheckMsgsWithAll(t *testing.T, sendMsgsCollector *SendMsgsCollector,
recvMsgsCollector *RecvMsgsCollector) {
+ assert.Equal(t, len(sendMsgsCollector.MsgIds),
len(recvMsgsCollector.MsgIds), "they should be equal")
+ for i, msg := range sendMsgsCollector.SendMsgs {
+ var matchedMsgViews *rmq_client.MessageView
+ for j, msgViews := range recvMsgsCollector.RecvMsgViews {
+ if sendMsgsCollector.MsgIds[i] ==
msgViews.GetMessageId() {
+ matchedMsgViews =
recvMsgsCollector.RecvMsgViews[j]
+ break
+ }
+ }
+ assert.Equal(t, sendMsgsCollector.MsgIds[i],
matchedMsgViews.GetMessageId(), "they should be equal")
+ assert.Equal(t, msg.GetKeys(), matchedMsgViews.GetKeys(), "they
should be equal")
+ assert.Equal(t, msg.GetTag(), matchedMsgViews.GetTag(), "they
should be equal")
+ assert.Equal(t, msg.Body, matchedMsgViews.GetBody(), "they
should be equal")
+ }
+}
+
+func CheckDelayMsgsWithMsgId(t *testing.T, sendMsgsCollector
*SendMsgsCollector, recvMsgsCollector *RecvMsgsCollector, acceptableVariation
float64) {
+ CheckMsgsWithMsgId(t, sendMsgsCollector, recvMsgsCollector)
+ for i, msg := range sendMsgsCollector.SendMsgs {
+ var matchedMsgViews *rmq_client.MessageView
+ for j, msgViews := range recvMsgsCollector.RecvMsgViews {
+ if sendMsgsCollector.MsgIds[i] ==
msgViews.GetMessageId() {
+ matchedMsgViews =
recvMsgsCollector.RecvMsgViews[j]
+ break
+ }
+ }
+ assert.Equal(t, msg.GetDeliveryTimestamp(),
matchedMsgViews.GetDeliveryTimestamp(), "they should be equal")
+ assert.True(t,
math.Abs(float64(time.Now().Unix())-float64(msg.GetDeliveryTimestamp().Unix()))
< acceptableVariation, "they should be equal")
+ }
+}
+
+func CheckFIFOMsgsWithMsgId(t *testing.T, sendMsgsCollector
*SendMsgsCollector, recvMsgsCollector *RecvMsgsCollector) {
+ assert.Equal(t, len(sendMsgsCollector.MsgIds),
len(recvMsgsCollector.MsgIds), "they should be equal")
+ assert.Equal(t, sendMsgsCollector.MsgIds, recvMsgsCollector.MsgIds,
"they should be equal")
+}
+
+func CheckTransactionMsgsWithMsgId(t *testing.T, sendMsgsCollector
*SendMsgsCollector, recvMsgsCollector *RecvMsgsCollector) {
+ CheckMsgsWithMsgId(t, sendMsgsCollector, recvMsgsCollector)
+}
diff --git a/golang/utils/ClientUtils.go b/golang/utils/ClientUtils.go
new file mode 100644
index 0000000..99c9796
--- /dev/null
+++ b/golang/utils/ClientUtils.go
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "context"
+ "fmt"
+ rmq_client "github.com/apache/rocketmq-clients/golang"
+ "github.com/apache/rocketmq-clients/golang/credentials"
+ "log"
+ "os"
+ "strings"
+ "time"
+)
+
+var (
+ // maximum waiting time for receive func
+ awaitDuration = time.Second * 5
+ // maximum number of messages received at one time
+ maxMessageNum int32 = 32
+ // invisibleDuration should > 20s
+ invisibleDuration = time.Second * 20
+ // receive messages in a loop
+ GRPC_ENDPOINT = os.Getenv("GRPC_ENDPOINT")
+ NAMESERVER = os.Getenv("NAMESERVER")
+ BROKER_ADDR = os.Getenv("BROKER_ADDR")
+ CLUSTER_NAME = os.Getenv("CLUSTER_NAME")
+ ACCESS_KEY = os.Getenv("ACCESS_KEY")
+ SECRET_KEY = os.Getenv("SECRET_KEY")
+)
+
+func init() {
+ os.Setenv("mq.consoleAppender.enabled", "true")
+ rmq_client.ResetLogger()
+}
+func BuildProducerWithDefaultInfo(topic ...string) rmq_client.Producer {
+
+ producer, err := rmq_client.NewProducer(&rmq_client.Config{
+ Endpoint: NAMESERVER,
+ Credentials: &credentials.SessionCredentials{
+ AccessKey: ACCESS_KEY,
+ AccessSecret: SECRET_KEY,
+ },
+ },
+ rmq_client.WithTopics(topic...),
+ )
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // start producer
+ err = producer.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return producer
+}
+
+func BuildProducer(nameserver string, ak string, sk string, topic ...string)
rmq_client.Producer {
+
+ // new producer instance
+ producer, err := rmq_client.NewProducer(&rmq_client.Config{
+ Endpoint: nameserver,
+ Credentials: &credentials.SessionCredentials{
+ AccessKey: ak,
+ AccessSecret: sk,
+ },
+ },
+ rmq_client.WithTopics(topic...),
+ )
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // start producer
+ err = producer.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return producer
+}
+
+func BuildTransactionProducer(nameserver string, ak string, sk string, checker
*rmq_client.TransactionChecker, topics ...string) rmq_client.Producer {
+
+ producer, err := rmq_client.NewProducer(&rmq_client.Config{
+ Endpoint: nameserver,
+ Credentials: &credentials.SessionCredentials{
+ AccessKey: ak,
+ AccessSecret: sk,
+ },
+ },
+ rmq_client.WithTransactionChecker(checker),
+ rmq_client.WithTopics(topics...),
+ )
+ if err != nil {
+ log.Fatal(err)
+ }
+ // start producer
+ err = producer.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return producer
+}
+
+func BuildSimpleConsumer(nameserver string, consumerGroup string,
filterExpression string, ak string, sk string, topic string)
rmq_client.SimpleConsumer {
+
+ simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
+ Endpoint: nameserver,
+ ConsumerGroup: consumerGroup,
+ Credentials: &credentials.SessionCredentials{
+ AccessKey: ak,
+ AccessSecret: sk,
+ },
+ },
+ rmq_client.WithAwaitDuration(awaitDuration),
+
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
+ topic: rmq_client.NewFilterExpression(filterExpression),
+ }),
+ )
+ if err != nil {
+ log.Fatal(err)
+ }
+ // start simpleConsumer
+ err = simpleConsumer.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return simpleConsumer
+}
+
+func BuildNormalMessage(topic string, body string, tag string, keys ...string)
*rmq_client.Message {
+ msg := &rmq_client.Message{
+ Topic: topic,
+ Body: []byte(body),
+ }
+ msg.SetKeys(keys...)
+ msg.SetTag(tag)
+ return msg
+}
+
+func BuildDelayMessage(topic string, body string, tag string, delaySeconds
time.Duration, keys ...string) *rmq_client.Message {
+ msg := BuildNormalMessage(topic, body, tag, keys...)
+ msg.SetDelayTimestamp(time.Now().Add(time.Second * delaySeconds))
+ return msg
+}
+
+func BuildFIFOMessage(topic string, body string, tag string, consumerGroup
string, keys ...string) *rmq_client.Message {
+ msg := BuildNormalMessage(topic, body, tag, keys...)
+ msg.SetMessageGroup(consumerGroup)
+ return msg
+}
+
+func SendMessage(producer rmq_client.Producer, message *rmq_client.Message,
sendMsgCollector *SendMsgsCollector) {
+ resp, err := producer.Send(context.TODO(), message)
+ if err != nil {
+ log.Fatal(err)
+ }
+ for i := 0; i < len(resp); i++ {
+ sendMsgCollector.MsgIds = append(sendMsgCollector.MsgIds,
resp[i].MessageID)
+ sendMsgCollector.SendMsgs = append(sendMsgCollector.SendMsgs,
message)
+ fmt.Printf("%#v\n", resp[i])
+ }
+}
+
+func SendMessageAsync(producer rmq_client.Producer, message
*rmq_client.Message, sendMsgCollector *SendMsgsCollector) {
+ // send message in async
+ producer.SendAsync(context.TODO(), message, func(ctx context.Context,
resp []*rmq_client.SendReceipt, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+ for i := 0; i < len(resp); i++ {
+ sendMsgCollector.MsgIds =
append(sendMsgCollector.MsgIds, resp[i].MessageID)
+ sendMsgCollector.SendMsgs =
append(sendMsgCollector.SendMsgs, message)
+ fmt.Printf("%#v\n", resp[i])
+ }
+ })
+}
+
+func SendNormalMessage(producer rmq_client.Producer, topic string, body
string, tag string, sendNum int, keys ...string) *SendMsgsCollector {
+ sendMsgCollector := NewSendMsgsCollector()
+ for i := 0; i < sendNum; i++ {
+ // new a message
+ msg := BuildNormalMessage(topic, body, tag, keys...)
+ // send message in sync
+ resp, err := producer.Send(context.TODO(), msg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ for i := 0; i < len(resp); i++ {
+ sendMsgCollector.MsgIds =
append(sendMsgCollector.MsgIds, resp[i].MessageID)
+ sendMsgCollector.SendMsgs =
append(sendMsgCollector.SendMsgs, msg)
+ fmt.Printf("%#v\n", resp[i])
+ }
+ }
+ return sendMsgCollector
+}
+
+func SendTransactionMessage(producer rmq_client.Producer, topic string, body
string, tag string, sendNum int, keys ...string) *SendMsgsCollector {
+ sendMsgCollector := NewSendMsgsCollector()
+ for i := 0; i < sendNum; i++ {
+ // new a message
+ msg := BuildNormalMessage(topic, body, tag, keys...)
+ // send message in sync
+ transaction := producer.BeginTransaction()
+ resp, err := producer.SendWithTransaction(context.TODO(), msg,
transaction)
+ if err != nil {
+ log.Fatal(err)
+ }
+ // commit transaction message directly
+ err = transaction.Commit()
+ if err != nil {
+ log.Fatal(err)
+ }
+ for i := 0; i < len(resp); i++ {
+ sendMsgCollector.MsgIds =
append(sendMsgCollector.MsgIds, resp[i].MessageID)
+ sendMsgCollector.SendMsgs =
append(sendMsgCollector.SendMsgs, msg)
+ fmt.Printf("%#v\n", resp[i])
+ }
+ }
+ return sendMsgCollector
+}
+
+func SendNormalMessageAsync(producer rmq_client.Producer, topic string, body
string, tag string, sendNum int, keys ...string) *SendMsgsCollector {
+ sendMsgCollector := NewSendMsgsCollector()
+ // new a message
+ msg := BuildNormalMessage(topic, body, tag, keys...)
+ for i := 0; i < sendNum; i++ {
+ // send message in async
+ producer.SendAsync(context.TODO(), msg, func(ctx
context.Context, resp []*rmq_client.SendReceipt, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+ for i := 0; i < len(resp); i++ {
+ sendMsgCollector.MsgIds =
append(sendMsgCollector.MsgIds, resp[i].MessageID)
+ sendMsgCollector.SendMsgs =
append(sendMsgCollector.SendMsgs, msg)
+ fmt.Printf("%#v\n", resp[i])
+ }
+ })
+ }
+ return sendMsgCollector
+}
+
+func RecvMessage(simpleConsumer rmq_client.SimpleConsumer, maxMessageNum
int32, invisibleDuration time.Duration, pollSeconds int64) *RecvMsgsCollector {
+ recvMsgCollector := NewRecvMsgsCollector()
+ start := time.Now().Unix()
+ for {
+ mvs, err := simpleConsumer.Receive(context.TODO(),
maxMessageNum, invisibleDuration)
+ if err != nil {
+ fmt.Println(err)
+ if strings.Contains(err.Error(), "MESSAGE_NOT_FOUND") {
+ break
+ }
+ }
+ // ack message
+ for _, mv := range mvs {
+ simpleConsumer.Ack(context.TODO(), mv)
+ recvMsgCollector.MsgIds =
append(recvMsgCollector.MsgIds, mv.GetMessageId())
+ recvMsgCollector.RecvMsgViews =
append(recvMsgCollector.RecvMsgViews, mv)
+ fmt.Println(mv)
+ }
+ if time.Now().Unix()-start > pollSeconds {
+ break
+ }
+ }
+ return recvMsgCollector
+}
+
+func RecvMessageWithNum(simpleConsumer rmq_client.SimpleConsumer,
maxMessageNum int32, invisibleDuration time.Duration, pollSeconds int64,
recvNum int) *RecvMsgsCollector {
+ recvMsgCollector := NewRecvMsgsCollector()
+ start := time.Now().Unix()
+ for {
+ mvs, err := simpleConsumer.Receive(context.TODO(),
maxMessageNum, invisibleDuration)
+ if err != nil {
+ fmt.Println(err)
+ if strings.Contains(err.Error(), "MESSAGE_NOT_FOUND") {
+ break
+ }
+ }
+ // ack message
+ for _, mv := range mvs {
+ simpleConsumer.Ack(context.TODO(), mv)
+ recvMsgCollector.MsgIds =
append(recvMsgCollector.MsgIds, mv.GetMessageId())
+ recvMsgCollector.RecvMsgViews =
append(recvMsgCollector.RecvMsgViews, mv)
+ fmt.Println(mv)
+ }
+ if time.Now().Unix()-start > pollSeconds ||
len(recvMsgCollector.MsgIds) >= recvNum {
+ break
+ }
+ }
+ return recvMsgCollector
+}
diff --git a/golang/utils/MQAdminUtils.go b/golang/utils/MQAdminUtils.go
new file mode 100644
index 0000000..a9332a1
--- /dev/null
+++ b/golang/utils/MQAdminUtils.go
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "fmt"
+ "log"
+ "os/exec"
+ "path"
+ "runtime"
+)
+
+var root string
+
+func init() {
+ _, filename, _, ok := runtime.Caller(0)
+ // get whole project base path
+ root = path.Dir(path.Dir(path.Dir(filename)))
+ if !ok {
+ log.Fatal("get project root path failed")
+ }
+}
+
+func CreateTopic(topicName string, brokerAddr string, clusterName string,
nameserver string) {
+ // use absolute path
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic
-t " + topicName
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ if brokerAddr != "" {
+ command += " -b " + brokerAddr
+ }
+ if clusterName != "" {
+ command += " -c " + clusterName
+ }
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(string(out))
+}
+
+func CreateDelayTopic(topicName string, brokerAddr string, clusterName string,
nameserver string) {
+ // use absolute path
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic
-t " + topicName
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ if brokerAddr != "" {
+ command += " -b " + brokerAddr
+ }
+ if clusterName != "" {
+ command += " -c " + clusterName
+ }
+ command += " -a " + "+message.type=DELAY"
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(string(out))
+}
+
+func CreateFIFOTopic(topicName string, brokerAddr string, clusterName string,
nameserver string) {
+ // use absolute path
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic
-t " + topicName
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ if brokerAddr != "" {
+ command += " -b " + brokerAddr
+ }
+ if clusterName != "" {
+ command += " -c " + clusterName
+ }
+ command += " -a " + "+message.type=FIFO"
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(string(out))
+}
+
+func CreateTransactionTopic(topicName string, brokerAddr string, clusterName
string, nameserver string) {
+ // use absolute path
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin updateTopic
-t " + topicName
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ if brokerAddr != "" {
+ command += " -b " + brokerAddr
+ }
+ if clusterName != "" {
+ command += " -c " + clusterName
+ }
+ command += " -a " + "+message.type=TRANSACTION"
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(string(out))
+}
+
+func CreateOrderlyConsumerGroup(consumerGroup string, brokerAddr string,
clusterName string, nameserver string) {
+ // use absolute path
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin
updateSubGroup -g " + consumerGroup
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ if brokerAddr != "" {
+ command += " -b " + brokerAddr
+ }
+ if clusterName != "" {
+ command += " -c " + clusterName
+ }
+ command += " -s true -o true -m false -d false "
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(string(out))
+}
+
+func ClusterList(nameserver string) string {
+ command := "sh " + root + "/rocketmq-admintools/bin/mqadmin clusterlist"
+ if nameserver != "" {
+ command += " -n " + nameserver
+ }
+ fmt.Println(command)
+ out, err := exec.Command("/bin/bash", "-c", command).Output()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return string(out)
+}
diff --git a/golang/utils/MsgCheck.go b/golang/utils/MsgCheck.go
new file mode 100644
index 0000000..21a159e
--- /dev/null
+++ b/golang/utils/MsgCheck.go
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ rmq_client "github.com/apache/rocketmq-clients/golang"
+)
+
+type SendMsgsCollector struct {
+ MsgIds []string
+ SendMsgs []*rmq_client.Message
+}
+
+func NewSendMsgsCollector() *SendMsgsCollector {
+ return &SendMsgsCollector{
+ MsgIds: make([]string, 0),
+ SendMsgs: make([]*rmq_client.Message, 0),
+ }
+}
+
+type RecvMsgsCollector struct {
+ MsgIds []string
+ RecvMsgViews []*rmq_client.MessageView
+}
+
+func NewRecvMsgsCollector() *RecvMsgsCollector {
+ return &RecvMsgsCollector{
+ MsgIds: make([]string, 0),
+ RecvMsgViews: make([]*rmq_client.MessageView, 0),
+ }
+}
diff --git a/golang/utils/NameUtils.go b/golang/utils/NameUtils.go
new file mode 100644
index 0000000..852b8e5
--- /dev/null
+++ b/golang/utils/NameUtils.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "math/rand"
+ "time"
+)
+
+func RandomString(n int) string {
+ var letter =
[]rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
+ b := make([]rune, n)
+ rand.Seed(time.Now().UnixNano())
+ for i := range b {
+ b[i] = letter[rand.Intn(len(letter))]
+ }
+ return string(b)
+}
+
+func GetTopicName() string {
+ return "topic-" + RandomString(8)
+}
+
+func GetGroupName() string {
+ return "group-" + RandomString(8)
+}