This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c9e197c [ISSUE #950] support tls connection (#951)
c9e197c is described below
commit c9e197c3af45f846ba528cbc05f2228039be5f3e
Author: yuz10 <[email protected]>
AuthorDate: Mon Nov 6 10:19:16 2023 +0800
[ISSUE #950] support tls connection (#951)
* [ISSUE #950] support tls connection
* add tls to admin options
* support go 1.13
---
admin/admin.go | 6 ++++
consumer/option.go | 6 ++++
examples/consumer/tls/main.go | 59 ++++++++++++++++++++++++++++++++++++++
examples/producer/tls/main.go | 62 ++++++++++++++++++++++++++++++++++++++++
internal/remote/remote_client.go | 1 +
internal/remote/tcp_conn.go | 13 +++++++--
producer/option.go | 6 ++++
7 files changed, 151 insertions(+), 2 deletions(-)
diff --git a/admin/admin.go b/admin/admin.go
index 487c8b4..62175af 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -77,6 +77,12 @@ func WithNamespace(namespace string) AdminOption {
}
}
+func WithTls(useTls bool) AdminOption {
+ return func(options *adminOptions) {
+ options.ClientOptions.RemotingClientConfig.UseTls = useTls
+ }
+}
+
type admin struct {
cli internal.RMQClient
diff --git a/consumer/option.go b/consumer/option.go
index ac7dd93..24acf7c 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -381,3 +381,9 @@ func WithLimiter(limiter Limiter) Option {
opts.Limiter = limiter
}
}
+
+func WithTls(useTls bool) Option {
+ return func(opts *consumerOptions) {
+ opts.ClientOptions.RemotingClientConfig.UseTls = useTls
+ }
+}
diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go
new file mode 100644
index 0000000..248c837
--- /dev/null
+++ b/examples/consumer/tls/main.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ consumer.WithTls(true),
+ )
+ err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ for i := range msgs {
+ fmt.Printf("subscribe callback: %v \n", msgs[i])
+ }
+
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+ err = c.Shutdown()
+ if err != nil {
+ fmt.Printf("shutdown Consumer error: %s", err.Error())
+ }
+}
diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go
new file mode 100644
index 0000000..c926c05
--- /dev/null
+++ b/examples/producer/tls/main.go
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strconv"
+
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+// Package main implements a simple producer to send message.
+func main() {
+ p, _ := rocketmq.NewProducer(
+
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+ producer.WithRetry(2),
+ producer.WithTls(true),
+ )
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ topic := "test"
+
+ for i := 0; i < 10; i++ {
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("Hello RocketMQ Go Client! " +
strconv.Itoa(i)),
+ }
+ res, err := p.SendSync(context.Background(), msg)
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
res.String())
+ }
+ }
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shutdown producer error: %s", err.Error())
+ }
+}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 4013689..45dfbbf 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -38,6 +38,7 @@ type TcpOption struct {
ConnectionTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
+ UseTls bool
}
//go:generate mockgen -source remote_client.go -destination
mock_remote_client.go -self_package
github.com/apache/rocketmq-client-go/v2/internal/remote --package remote
RemotingClient
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
index 93ed837..3b7d164 100644
--- a/internal/remote/tcp_conn.go
+++ b/internal/remote/tcp_conn.go
@@ -18,6 +18,7 @@ package remote
import (
"context"
+ "crypto/tls"
"net"
"sync"
"time"
@@ -34,11 +35,19 @@ type tcpConnWrapper struct {
func initConn(ctx context.Context, addr string, config *RemotingClientConfig)
(*tcpConnWrapper, error) {
var d net.Dialer
-
d.KeepAlive = config.KeepAliveDuration
d.Deadline = time.Now().Add(config.ConnectionTimeout)
- conn, err := d.DialContext(ctx, "tcp", addr)
+ var conn net.Conn
+ var err error
+ if config.UseTls {
+ conn, err = tls.DialWithDialer(&d, "tcp", addr, &tls.Config{
+ InsecureSkipVerify: true,
+ })
+ } else {
+ conn, err = d.DialContext(ctx, "tcp", addr)
+ }
+
if err != nil {
return nil, err
}
diff --git a/producer/option.go b/producer/option.go
index c3a0dc4..6e43cc2 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -178,3 +178,9 @@ func WithCompressLevel(level int) Option {
opts.CompressLevel = level
}
}
+
+func WithTls(useTls bool) Option {
+ return func(opts *producerOptions) {
+ opts.ClientOptions.RemotingClientConfig.UseTls = useTls
+ }
+}