This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f2fb16d unmarshalMsgID support ipv6 (#1203)
f2fb16d is described below
commit f2fb16d33e689a1c57ea7681314d147ad61a3704
Author: SHI <[email protected]>
AuthorDate: Wed Nov 12 15:17:07 2025 +0800
unmarshalMsgID support ipv6 (#1203)
Co-authored-by: shixiaoxiao <[email protected]>
---
internal/utils/net.go | 5 +++--
primitive/message.go | 39 +++++++++++++++++++++++++++++++++------
primitive/message_test.go | 18 ++++++++++++++++++
3 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/internal/utils/net.go b/internal/utils/net.go
index a4eeb56..1588148 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -20,10 +20,11 @@ package utils
import (
"bytes"
"fmt"
- "github.com/apache/rocketmq-client-go/v2/errors"
"net"
"strconv"
"time"
+
+ "github.com/apache/rocketmq-client-go/v2/errors"
)
var (
@@ -66,5 +67,5 @@ func FakeIP() []byte {
}
func GetAddressByBytes(data []byte) string {
- return net.IPv4(data[0], data[1], data[2], data[3]).String()
+ return net.IP(data).String()
}
diff --git a/primitive/message.go b/primitive/message.go
index 8b390c4..123ded5 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -482,19 +482,46 @@ func UnmarshalMsgID(id []byte) (*MessageID, error) {
if len(id) < 32 {
return nil, fmt.Errorf("%s len < 32", string(id))
}
+
var (
ipBytes = make([]byte, 4)
portBytes = make([]byte, 4)
offsetBytes = make([]byte, 8)
)
- hex.Decode(ipBytes, id[0:8])
- hex.Decode(portBytes, id[8:16])
- hex.Decode(offsetBytes, id[16:32])
+ if len(id) == 32 {
+ hex.Decode(ipBytes, id[0:8])
+ hex.Decode(portBytes, id[8:16])
+ hex.Decode(offsetBytes, id[16:32])
+ } else {
+ ipBytes = make([]byte, 16)
+ portBytes = make([]byte, 4)
+ offsetBytes = make([]byte, 8)
+ hex.Decode(ipBytes, id[0:32])
+ hex.Decode(portBytes, id[32:40])
+ hex.Decode(offsetBytes, id[40:56])
+ }
+
+ addr := utils.GetAddressByBytes(ipBytes)
+ port := int(binary.BigEndian.Uint32(portBytes))
+ offset := int64(binary.BigEndian.Uint64(offsetBytes))
+
+ if addr == "" {
+ return nil, fmt.Errorf("addr is empty")
+ }
+
+ if port < 0 || port > 65535 {
+ return nil, fmt.Errorf("port > 65535, acutal port is %d", port)
+ }
+
+ if len(id) != 32 {
+ // DialContext require ipv6 format: [ipv6]:port
+ addr = fmt.Sprintf("[%s]", addr)
+ }
return &MessageID{
- Addr: utils.GetAddressByBytes(ipBytes),
- Port: int(binary.BigEndian.Uint32(portBytes)),
- Offset: int64(binary.BigEndian.Uint64(offsetBytes)),
+ Addr: addr,
+ Port: port,
+ Offset: offset,
}, nil
}
diff --git a/primitive/message_test.go b/primitive/message_test.go
index 7248df7..65cc03b 100644
--- a/primitive/message_test.go
+++ b/primitive/message_test.go
@@ -37,6 +37,24 @@ func TestMessageID(t *testing.T) {
t.Log(msgID)
}
+func TestIpv6MessageID(t *testing.T) {
+ id := []byte("FDBDDC4100010136C800000000000024000078BF0000000000004F45")
+ msgID, err := UnmarshalMsgID(id)
+ if err != nil {
+ t.Fatalf("unmarshal msg id error, ms is: %s", err.Error())
+ }
+ if msgID.Addr != "[fdbd:dc41:1:136:c800::24]" {
+ t.Fatalf("parse messageID %s error", id)
+ }
+ if msgID.Port != 30911 {
+ t.Fatalf("parse messageID %s error", id)
+ }
+ if msgID.Offset != 20293 {
+ t.Fatalf("parse messageID %s error", id)
+ }
+ t.Log(msgID)
+}
+
func TestMessageKey(t *testing.T) {
msg := &Message{}
expected := "testKey"