This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 310fb94d Bump the minimum Go version to 1.22 (#1300)
310fb94d is described below

commit 310fb94d29624e380b88968f3c2d030f12a1c4af
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 26 09:34:35 2024 +0800

    Bump the minimum Go version to 1.22 (#1300)
---
 .github/workflows/ci.yml                           |   8 +-
 Dockerfile                                         |   2 +-
 Makefile                                           |   2 +-
 go.mod                                             |   2 +-
 oauth2/auth_suite_test.go                          |  10 +-
 oauth2/authorization_tokenretriever_test.go        | 170 ++++++++++-----------
 oauth2/client_credentials_flow_test.go             |  55 ++++---
 oauth2/clock/testing/fake_clock.go                 |   8 +-
 oauth2/config_tokenprovider_test.go                |  28 ++--
 oauth2/device_code_flow_test.go                    |  70 ++++-----
 oauth2/oidc_endpoint_provider_test.go              |  42 ++---
 perf/perf-consumer.go                              |   2 +-
 perf/perf-producer.go                              |   4 +-
 perf/pulsar-perf-go.go                             |   2 +-
 pulsar/ack_grouping_tracker.go                     |   2 +-
 pulsar/ack_grouping_tracker_test.go                |   4 +-
 pulsar/auth/athenz_test.go                         |   8 +-
 pulsar/auth/disabled.go                            |   4 +-
 pulsar/auth/oauth2_test.go                         |   6 +-
 pulsar/auth/token.go                               |   3 +-
 pulsar/client_impl_test.go                         |   6 +-
 pulsar/consumer_impl.go                            |   3 +-
 pulsar/consumer_multitopic.go                      |   4 +-
 pulsar/consumer_partition.go                       |  31 ++--
 pulsar/consumer_test.go                            |  70 ++++-----
 pulsar/crypto/default_message_crypto.go            |   2 +-
 pulsar/dlq_router.go                               |   7 +-
 pulsar/helper_for_test.go                          |   4 +-
 pulsar/internal/channel_cond_test.go               |   4 +-
 pulsar/internal/compression/noop.go                |   2 +-
 pulsar/internal/compression/zstd_cgo.go            |   2 +-
 pulsar/internal/compression/zstd_go.go             |   2 +-
 pulsar/internal/crypto/consumer_decryptor.go       |   2 +-
 pulsar/internal/crypto/noop_decryptor.go           |   2 +-
 pulsar/internal/crypto/noop_encryptor.go           |   2 +-
 pulsar/internal/http_client.go                     |   4 +-
 pulsar/internal/lookup_service.go                  |   2 +-
 pulsar/internal/lookup_service_test.go             |  36 ++---
 .../internal/pulsartracing/consumer_interceptor.go |   4 +-
 .../pulsartracing/consumer_interceptor_test.go     |  26 ++--
 .../pulsartracing/message_carrier_adaptors.go      |   8 +-
 .../pulsartracing/message_carrier_util_test.go     |   2 +-
 .../internal/pulsartracing/producer_interceptor.go |   6 +-
 .../pulsartracing/producer_interceptor_test.go     |   2 +-
 pulsar/log/log.go                                  |  44 +++---
 pulsar/negative_acks_tracker.go                    |  10 +-
 pulsar/negative_acks_tracker_test.go               |  64 ++++----
 pulsar/producer_partition.go                       |  95 ++++++------
 pulsar/producer_test.go                            |  56 +++----
 pulsar/reader_test.go                              |   4 +-
 pulsar/retry_router.go                             |   9 +-
 pulsar/table_view_test.go                          |   6 +-
 pulsar/transaction_impl.go                         |   4 +-
 pulsaradmin/pkg/admin/admin_test.go                |   2 +-
 pulsaradmin/pkg/admin/auth/oauth2.go               |   4 +-
 pulsaradmin/pkg/admin/auth/oauth2_test.go          |   6 +-
 pulsaradmin/pkg/admin/namespace.go                 |   6 +-
 pulsaradmin/pkg/admin/subscription_test.go         |   2 +-
 pulsaradmin/pkg/utils/data.go                      |   2 +-
 pulsaradmin/pkg/utils/schema_util.go               |   6 +-
 60 files changed, 488 insertions(+), 497 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c0b77c60..52781048 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -22,7 +22,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        go-version: [ '1.21', '1.22' ]
+        go-version: [ '1.22', '1.23' ]
     steps:
       - uses: actions/checkout@v3
       - uses: actions/setup-go@v5
@@ -36,19 +36,19 @@ jobs:
       - uses: actions/checkout@v3
       - uses: actions/setup-go@v5
         with:
-          go-version: '1.20'
+          go-version: '1.22'
       - name: Check license header
         run: docker run --rm -v $(pwd):/github/workspace 
ghcr.io/korandoru/hawkeye-native:v3 check
       - name: Run golangci-lint
         uses: golangci/golangci-lint-action@v6
         with:
-          version: v1.51.2
+          version: v1.61.0
 
   integration-tests:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        go-version: [ '1.21', '1.22' ]
+        go-version: [ '1.22', '1.23' ]
     steps:
       - uses: actions/checkout@v3
       - name: clean docker cache
diff --git a/Dockerfile b/Dockerfile
index fecfa98f..8895c076 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,7 +19,7 @@
 # set via the Makefile or CLI
 ARG PULSAR_IMAGE=apachepulsar/pulsar:latest
 
-ARG GO_VERSION=1.20
+ARG GO_VERSION=1.22
 FROM golang:$GO_VERSION as golang
 
 FROM $PULSAR_IMAGE
diff --git a/Makefile b/Makefile
index 23844c41..5c38303e 100644
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,7 @@
 IMAGE_NAME = pulsar-client-go-test:latest
 PULSAR_VERSION ?= 3.2.2
 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
-GO_VERSION ?= 1.21
+GO_VERSION ?= 1.22
 CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/)
 
 # Golang standard bin directory.
diff --git a/go.mod b/go.mod
index 1a7025fd..c2f3457a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/apache/pulsar-client-go
 
-go 1.21
+go 1.22
 
 require (
        github.com/99designs/keyring v1.2.1
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
index 95accff2..54b24299 100644
--- a/oauth2/auth_suite_test.go
+++ b/oauth2/auth_suite_test.go
@@ -21,13 +21,13 @@ import (
        "context"
        "testing"
 
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
 func TestAuth(t *testing.T) {
-       RegisterFailHandler(Fail)
-       RunSpecs(t, "cloud-cli Auth Suite")
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "cloud-cli Auth Suite")
 }
 
 type MockTokenExchanger struct {
@@ -52,7 +52,7 @@ func (te *MockTokenExchanger) ExchangeClientCredentials(req 
ClientCredentialsExc
        return te.ReturnsTokens, te.ReturnsError
 }
 
-func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context,
+func (te *MockTokenExchanger) ExchangeDeviceCode(_ context.Context,
        req DeviceCodeExchangeRequest) (*TokenResult, error) {
        te.CalledWithRequest = &req
        return te.ReturnsTokens, te.ReturnsError
diff --git a/oauth2/authorization_tokenretriever_test.go 
b/oauth2/authorization_tokenretriever_test.go
index 6ae55d8f..d57f9dc3 100644
--- a/oauth2/authorization_tokenretriever_test.go
+++ b/oauth2/authorization_tokenretriever_test.go
@@ -26,8 +26,8 @@ import (
        "strings"
        "time"
 
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
 type MockTransport struct {
@@ -37,7 +37,7 @@ type MockTransport struct {
 
 var _ HTTPAuthTransport = &MockTransport{}
 
-func (t *MockTransport) Do(req *http.Request) (*http.Response, error) {
+func (t *MockTransport) Do(_ *http.Request) (*http.Response, error) {
        if len(t.Responses) > 0 {
                r := t.Responses[0]
                t.Responses = t.Responses[1:]
@@ -46,9 +46,9 @@ func (t *MockTransport) Do(req *http.Request) 
(*http.Response, error) {
        return nil, t.ReturnError
 }
 
-var _ = Describe("CodetokenExchanger", func() {
-       Describe("newExchangeCodeRequest", func() {
-               It("creates the request", func() {
+var _ = ginkgo.Describe("CodetokenExchanger", func() {
+       ginkgo.Describe("newExchangeCodeRequest", func() {
+               ginkgo.It("creates the request", func() {
                        tokenRetriever := TokenRetriever{}
                        exchangeRequest := AuthorizationCodeExchangeRequest{
                                TokenEndpoint: "https://issuer/oauth/token";,
@@ -62,32 +62,32 @@ var _ = Describe("CodetokenExchanger", func() {
 
                        result.ParseForm()
 
-                       Expect(err).To(BeNil())
-                       
Expect(result.FormValue("grant_type")).To(Equal("authorization_code"))
-                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-                       
Expect(result.FormValue("code_verifier")).To(Equal("Verifier"))
-                       Expect(result.FormValue("code")).To(Equal("code"))
-                       
Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect";))
-                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+                       gomega.Expect(err).To(gomega.BeNil())
+                       
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("authorization_code"))
+                       
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+                       
gomega.Expect(result.FormValue("code_verifier")).To(gomega.Equal("Verifier"))
+                       
gomega.Expect(result.FormValue("code")).To(gomega.Equal("code"))
+                       
gomega.Expect(result.FormValue("redirect_uri")).To(gomega.Equal("https://redirect";))
+                       
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token";))
 
-                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-                       
Expect(result.Header.Get("Content-Length")).To(Equal("117"))
+                       
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+                       
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("117"))
                })
 
-               It("returns an error when NewRequest returns an error", func() {
+               ginkgo.It("returns an error when NewRequest returns an error", 
func() {
                        tokenRetriever := TokenRetriever{}
 
                        result, err := 
tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{
                                TokenEndpoint: "://issuer/oauth/token",
                        })
 
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
                })
        })
 
-       Describe("handleAuthTokensResponse", func() {
-               It("handles the response", func() {
+       ginkgo.Describe("handleAuthTokensResponse", func() {
+               ginkgo.It("handles the response", func() {
                        tokenRetriever := TokenRetriever{}
                        response := buildResponse(200, 
AuthorizationTokenResponse{
                                ExpiresIn:    1,
@@ -97,49 +97,49 @@ var _ = Describe("CodetokenExchanger", func() {
 
                        result, err := 
tokenRetriever.handleAuthTokensResponse(response)
 
-                       Expect(err).To(BeNil())
-                       Expect(result).To(Equal(&TokenResult{
+                       gomega.Expect(err).To(gomega.BeNil())
+                       gomega.Expect(result).To(gomega.Equal(&TokenResult{
                                ExpiresIn:    1,
                                AccessToken:  "myAccessToken",
                                RefreshToken: "myRefreshToken",
                        }))
                })
 
-               It("returns error when status code is not successful", func() {
+               ginkgo.It("returns error when status code is not successful", 
func() {
                        tokenRetriever := TokenRetriever{}
                        response := buildResponse(500, nil)
 
                        result, err := 
tokenRetriever.handleAuthTokensResponse(response)
 
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Not(BeNil()))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       
gomega.Expect(err.Error()).To(gomega.Not(gomega.BeNil()))
                })
 
-               It("returns typed error when response body contains error 
information", func() {
+               ginkgo.It("returns typed error when response body contains 
error information", func() {
                        errorBody := TokenErrorResponse{Error: "test", 
ErrorDescription: "test description"}
                        tokenRetriever := TokenRetriever{}
                        response := buildResponse(400, errorBody)
 
                        result, err := 
tokenRetriever.handleAuthTokensResponse(response)
 
-                       Expect(result).To(BeNil())
-                       Expect(err).To(Equal(&TokenError{ErrorCode: "test", 
ErrorDescription: "test description"}))
-                       Expect(err.Error()).To(Equal("test description (test)"))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       
gomega.Expect(err).To(gomega.Equal(&TokenError{ErrorCode: "test", 
ErrorDescription: "test description"}))
+                       gomega.Expect(err.Error()).To(gomega.Equal("test 
description (test)"))
                })
 
-               It("returns error when deserialization fails", func() {
+               ginkgo.It("returns error when deserialization fails", func() {
                        tokenRetriever := TokenRetriever{}
                        response := buildResponse(200, "")
 
                        result, err := 
tokenRetriever.handleAuthTokensResponse(response)
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Equal(
+                       gomega.Expect(result).To(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal(
                                "json: cannot unmarshal string into Go value of 
type oauth2.AuthorizationTokenResponse"))
                })
        })
 
-       Describe("newRefreshTokenRequest", func() {
-               It("creates the request", func() {
+       ginkgo.Describe("newRefreshTokenRequest", func() {
+               ginkgo.It("creates the request", func() {
                        tokenRetriever := TokenRetriever{}
                        exchangeRequest := RefreshTokenExchangeRequest{
                                TokenEndpoint: "https://issuer/oauth/token";,
@@ -151,30 +151,30 @@ var _ = Describe("CodetokenExchanger", func() {
 
                        result.ParseForm()
 
-                       Expect(err).To(BeNil())
-                       
Expect(result.FormValue("grant_type")).To(Equal("refresh_token"))
-                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-                       
Expect(result.FormValue("refresh_token")).To(Equal("refreshToken"))
-                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+                       gomega.Expect(err).To(gomega.BeNil())
+                       
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("refresh_token"))
+                       
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+                       
gomega.Expect(result.FormValue("refresh_token")).To(gomega.Equal("refreshToken"))
+                       
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token";))
 
-                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-                       
Expect(result.Header.Get("Content-Length")).To(Equal("70"))
+                       
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+                       
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("70"))
                })
 
-               It("returns an error when NewRequest returns an error", func() {
+               ginkgo.It("returns an error when NewRequest returns an error", 
func() {
                        tokenRetriever := TokenRetriever{}
 
                        result, err := 
tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{
                                TokenEndpoint: "://issuer/oauth/token",
                        })
 
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
                })
        })
 
-       Describe("newClientCredentialsRequest", func() {
-               It("creates the request", func() {
+       ginkgo.Describe("newClientCredentialsRequest", func() {
+               ginkgo.It("creates the request", func() {
                        tokenRetriever := TokenRetriever{}
                        exchangeRequest := ClientCredentialsExchangeRequest{
                                TokenEndpoint: "https://issuer/oauth/token";,
@@ -187,31 +187,31 @@ var _ = Describe("CodetokenExchanger", func() {
 
                        result.ParseForm()
 
-                       Expect(err).To(BeNil())
-                       
Expect(result.FormValue("grant_type")).To(Equal("client_credentials"))
-                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-                       
Expect(result.FormValue("client_secret")).To(Equal("clientSecret"))
-                       
Expect(result.FormValue("audience")).To(Equal("audience"))
-                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+                       gomega.Expect(err).To(gomega.BeNil())
+                       
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("client_credentials"))
+                       
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+                       
gomega.Expect(result.FormValue("client_secret")).To(gomega.Equal("clientSecret"))
+                       
gomega.Expect(result.FormValue("audience")).To(gomega.Equal("audience"))
+                       
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token";))
 
-                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-                       
Expect(result.Header.Get("Content-Length")).To(Equal("93"))
+                       
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+                       
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("93"))
                })
 
-               It("returns an error when NewRequest returns an error", func() {
+               ginkgo.It("returns an error when NewRequest returns an error", 
func() {
                        tokenRetriever := TokenRetriever{}
 
                        result, err := 
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
                                TokenEndpoint: "://issuer/oauth/token",
                        })
 
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
                })
        })
 
-       Describe("newDeviceCodeExchangeRequest", func() {
-               It("creates the request", func() {
+       ginkgo.Describe("newDeviceCodeExchangeRequest", func() {
+               ginkgo.It("creates the request", func() {
                        tokenRetriever := TokenRetriever{}
                        exchangeRequest := DeviceCodeExchangeRequest{
                                TokenEndpoint: "https://issuer/oauth/token";,
@@ -224,35 +224,35 @@ var _ = Describe("CodetokenExchanger", func() {
 
                        result.ParseForm()
 
-                       Expect(err).To(BeNil())
-                       
Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code"))
-                       
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-                       
Expect(result.FormValue("device_code")).To(Equal("deviceCode"))
-                       
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token";))
+                       gomega.Expect(err).To(gomega.BeNil())
+                       
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("urn:ietf:params:oauth:grant-type:device_code"))
+                       
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+                       
gomega.Expect(result.FormValue("device_code")).To(gomega.Equal("deviceCode"))
+                       
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token";))
 
-                       
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-                       
Expect(result.Header.Get("Content-Length")).To(Equal("107"))
+                       
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+                       
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("107"))
                })
 
-               It("returns an error when NewRequest returns an error", func() {
+               ginkgo.It("returns an error when NewRequest returns an error", 
func() {
                        tokenRetriever := TokenRetriever{}
 
                        result, err := 
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
                                TokenEndpoint: "://issuer/oauth/token",
                        })
 
-                       Expect(result).To(BeNil())
-                       Expect(err.Error()).To(Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
+                       gomega.Expect(result).To(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("parse 
\"://issuer/oauth/token\": missing protocol scheme"))
                })
        })
 
-       Describe("ExchangeDeviceCode", func() {
+       ginkgo.Describe("ExchangeDeviceCode", func() {
                var mockTransport *MockTransport
                var tokenRetriever *TokenRetriever
                var exchangeRequest DeviceCodeExchangeRequest
                var tokenResult TokenResult
 
-               BeforeEach(func() {
+               ginkgo.BeforeEach(func() {
                        mockTransport = &MockTransport{}
                        tokenRetriever = &TokenRetriever{
                                transport: mockTransport,
@@ -270,21 +270,21 @@ var _ = Describe("CodetokenExchanger", func() {
                        }
                })
 
-               It("returns a token", func() {
+               ginkgo.It("returns a token", func() {
                })
 
-               It("supports cancellation", func() {
+               ginkgo.It("supports cancellation", func() {
                        mockTransport.Responses = []*http.Response{
                                buildResponse(400, 
&TokenErrorResponse{"authorization_pending", ""}),
                        }
                        ctx, cancel := context.WithCancel(context.Background())
                        cancel()
                        _, err := tokenRetriever.ExchangeDeviceCode(ctx, 
exchangeRequest)
-                       Expect(err).ToNot(BeNil())
-                       Expect(err.Error()).To(Equal("cancelled"))
+                       gomega.Expect(err).ToNot(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("cancelled"))
                })
 
-               It("implements authorization_pending and slow_down", func() {
+               ginkgo.It("implements authorization_pending and slow_down", 
func() {
                        startTime := time.Now()
                        mockTransport.Responses = []*http.Response{
                                buildResponse(400, 
&TokenErrorResponse{"authorization_pending", ""}),
@@ -293,28 +293,28 @@ var _ = Describe("CodetokenExchanger", func() {
                                buildResponse(200, &tokenResult),
                        }
                        token, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
-                       Expect(err).To(BeNil())
-                       Expect(token).To(Equal(&tokenResult))
+                       gomega.Expect(err).To(gomega.BeNil())
+                       gomega.Expect(token).To(gomega.Equal(&tokenResult))
                        endTime := time.Now()
-                       Expect(endTime.Sub(startTime)).To(BeNumerically(">", 
exchangeRequest.PollInterval*3))
+                       
gomega.Expect(endTime.Sub(startTime)).To(gomega.BeNumerically(">", 
exchangeRequest.PollInterval*3))
                })
 
-               It("implements expired_token", func() {
+               ginkgo.It("implements expired_token", func() {
                        mockTransport.Responses = []*http.Response{
                                buildResponse(400, 
&TokenErrorResponse{"expired_token", ""}),
                        }
                        _, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
-                       Expect(err).ToNot(BeNil())
-                       Expect(err.Error()).To(Equal("the device code has 
expired"))
+                       gomega.Expect(err).ToNot(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("the device 
code has expired"))
                })
 
-               It("implements access_denied", func() {
+               ginkgo.It("implements access_denied", func() {
                        mockTransport.Responses = []*http.Response{
                                buildResponse(400, 
&TokenErrorResponse{"access_denied", ""}),
                        }
                        _, err := 
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
-                       Expect(err).ToNot(BeNil())
-                       Expect(err.Error()).To(Equal("the device was not 
authorized"))
+                       gomega.Expect(err).ToNot(gomega.BeNil())
+                       gomega.Expect(err.Error()).To(gomega.Equal("the device 
was not authorized"))
                })
        })
 })
diff --git a/oauth2/client_credentials_flow_test.go 
b/oauth2/client_credentials_flow_test.go
index 6d9db354..8fd0a110 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -23,9 +23,8 @@ import (
 
        "github.com/apache/pulsar-client-go/oauth2/clock"
        "github.com/apache/pulsar-client-go/oauth2/clock/testing"
-
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
 type MockClientCredentialsProvider struct {
@@ -50,13 +49,13 @@ var clientCredentials = KeyFile{
        Scope:        "test_scope",
 }
 
-var _ = Describe("ClientCredentialsFlow", func() {
-       Describe("Authorize", func() {
+var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
+       ginkgo.Describe("Authorize", func() {
 
                var mockClock clock.Clock
                var mockTokenExchanger *MockTokenExchanger
 
-               BeforeEach(func() {
+               ginkgo.BeforeEach(func() {
                        mockClock = testing.NewFakeClock(time.Unix(0, 0))
                        expectedTokens := TokenResult{AccessToken: 
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
                        mockTokenExchanger = &MockTokenExchanger{
@@ -64,7 +63,7 @@ var _ = Describe("ClientCredentialsFlow", func() {
                        }
                })
 
-               It("invokes TokenExchanger with credentials", func() {
+               ginkgo.It("invokes TokenExchanger with credentials", func() {
                        additionalScope := "additional_scope"
                        provider := newClientCredentialsFlow(
                                ClientCredentialsFlowOptions{
@@ -78,8 +77,8 @@ var _ = Describe("ClientCredentialsFlow", func() {
                        )
 
                        _, err := provider.Authorize("test_audience")
-                       Expect(err).ToNot(HaveOccurred())
-                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
+                       
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{
                                TokenEndpoint: oidcEndpoints.TokenEndpoint,
                                ClientID:      clientCredentials.ClientID,
                                ClientSecret:  clientCredentials.ClientSecret,
@@ -88,7 +87,7 @@ var _ = Describe("ClientCredentialsFlow", func() {
                        }))
                })
 
-               It("returns TokensResult from TokenExchanger", func() {
+               ginkgo.It("returns TokensResult from TokenExchanger", func() {
                        provider := newClientCredentialsFlow(
                                ClientCredentialsFlowOptions{
                                        KeyFile: "test_keyfile",
@@ -100,12 +99,12 @@ var _ = Describe("ClientCredentialsFlow", func() {
                        )
 
                        grant, err := provider.Authorize("test_audience")
-                       Expect(err).ToNot(HaveOccurred())
+                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
                        expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
-                       Expect(*grant.Token).To(Equal(expected))
+                       gomega.Expect(*grant.Token).To(gomega.Equal(expected))
                })
 
-               It("returns an error if token exchanger errors", func() {
+               ginkgo.It("returns an error if token exchanger errors", func() {
                        mockTokenExchanger.ReturnsError = 
errors.New("someerror")
                        mockTokenExchanger.ReturnsTokens = nil
 
@@ -120,19 +119,19 @@ var _ = Describe("ClientCredentialsFlow", func() {
                        )
 
                        _, err := provider.Authorize("test_audience")
-                       Expect(err.Error()).To(Equal("authentication failed 
using client credentials: " +
+                       
gomega.Expect(err.Error()).To(gomega.Equal("authentication failed using client 
credentials: " +
                                "could not exchange client credentials: 
someerror"))
                })
        })
 })
 
-var _ = Describe("ClientCredentialsGrantRefresher", func() {
+var _ = ginkgo.Describe("ClientCredentialsGrantRefresher", func() {
 
-       Describe("Refresh", func() {
+       ginkgo.Describe("Refresh", func() {
                var mockClock clock.Clock
                var mockTokenExchanger *MockTokenExchanger
 
-               BeforeEach(func() {
+               ginkgo.BeforeEach(func() {
                        mockClock = testing.NewFakeClock(time.Unix(0, 0))
                        expectedTokens := TokenResult{AccessToken: 
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
                        mockTokenExchanger = &MockTokenExchanger{
@@ -140,7 +139,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
                        }
                })
 
-               It("invokes TokenExchanger with credentials", func() {
+               ginkgo.It("invokes TokenExchanger with credentials", func() {
                        refresher := &ClientCredentialsGrantRefresher{
                                clock:     mockClock,
                                exchanger: mockTokenExchanger,
@@ -154,8 +153,8 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
                                Scopes:            []string{"profile"},
                        }
                        _, err := refresher.Refresh(og)
-                       Expect(err).ToNot(HaveOccurred())
-                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
+                       
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{
                                TokenEndpoint: oidcEndpoints.TokenEndpoint,
                                ClientID:      clientCredentials.ClientID,
                                ClientSecret:  clientCredentials.ClientSecret,
@@ -164,7 +163,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
                        }))
                })
 
-               It("returns a valid grant", func() {
+               ginkgo.It("returns a valid grant", func() {
                        refresher := &ClientCredentialsGrantRefresher{
                                clock:     mockClock,
                                exchanger: mockTokenExchanger,
@@ -178,14 +177,14 @@ var _ = Describe("ClientCredentialsGrantRefresher", 
func() {
                                Scopes:            []string{"profile"},
                        }
                        ng, err := refresher.Refresh(og)
-                       Expect(err).ToNot(HaveOccurred())
-                       Expect(ng.Audience).To(Equal("test_audience"))
-                       Expect(ng.ClientID).To(Equal(""))
-                       
Expect(*ng.ClientCredentials).To(Equal(clientCredentials))
-                       
Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
+                       
gomega.Expect(ng.Audience).To(gomega.Equal("test_audience"))
+                       gomega.Expect(ng.ClientID).To(gomega.Equal(""))
+                       
gomega.Expect(*ng.ClientCredentials).To(gomega.Equal(clientCredentials))
+                       
gomega.Expect(ng.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint))
                        expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
-                       Expect(*ng.Token).To(Equal(expected))
-                       Expect(ng.Scopes).To(Equal([]string{"profile"}))
+                       gomega.Expect(*ng.Token).To(gomega.Equal(expected))
+                       
gomega.Expect(ng.Scopes).To(gomega.Equal([]string{"profile"}))
                })
        })
 })
diff --git a/oauth2/clock/testing/fake_clock.go 
b/oauth2/clock/testing/fake_clock.go
index 6fcbf4c0..1a0bacff 100644
--- a/oauth2/clock/testing/fake_clock.go
+++ b/oauth2/clock/testing/fake_clock.go
@@ -196,24 +196,24 @@ func (i *IntervalClock) Since(ts time.Time) time.Duration 
{
 
 // After is unimplemented, will panic.
 // TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) After(d time.Duration) <-chan time.Time {
+func (*IntervalClock) After(_ time.Duration) <-chan time.Time {
        panic("IntervalClock doesn't implement After")
 }
 
 // NewTimer is unimplemented, will panic.
 // TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
+func (*IntervalClock) NewTimer(_ time.Duration) clock.Timer {
        panic("IntervalClock doesn't implement NewTimer")
 }
 
 // Tick is unimplemented, will panic.
 // TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
+func (*IntervalClock) Tick(_ time.Duration) <-chan time.Time {
        panic("IntervalClock doesn't implement Tick")
 }
 
 // Sleep is unimplemented, will panic.
-func (*IntervalClock) Sleep(d time.Duration) {
+func (*IntervalClock) Sleep(_ time.Duration) {
        panic("IntervalClock doesn't implement Sleep")
 }
 
diff --git a/oauth2/config_tokenprovider_test.go 
b/oauth2/config_tokenprovider_test.go
index d949a5a5..bfb56bfb 100644
--- a/oauth2/config_tokenprovider_test.go
+++ b/oauth2/config_tokenprovider_test.go
@@ -18,8 +18,8 @@
 package oauth2
 
 import (
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
 type mockConfigProvider struct {
@@ -42,15 +42,15 @@ func (m *mockConfigProvider) SaveTokens(identifier, 
accessToken, refreshToken st
        m.SavedRefreshToken = refreshToken
 }
 
-var _ = Describe("main", func() {
-       Describe("configCachingProvider", func() {
-               It("sets up the identifier using the clientID and audience", 
func() {
+var _ = ginkgo.Describe("main", func() {
+       ginkgo.Describe("configCachingProvider", func() {
+               ginkgo.It("sets up the identifier using the clientID and 
audience", func() {
                        p := NewConfigBackedCachingProvider("iamclientid", 
"iamaudience", &mockConfigProvider{})
 
-                       
Expect(p.identifier).To(Equal("iamclientid-iamaudience"))
+                       
gomega.Expect(p.identifier).To(gomega.Equal("iamclientid-iamaudience"))
                })
 
-               It("gets tokens from the config provider", func() {
+               ginkgo.It("gets tokens from the config provider", func() {
                        c := &mockConfigProvider{
                                ReturnAccessToken:  "accessToken",
                                ReturnRefreshToken: "refreshToken",
@@ -62,15 +62,15 @@ var _ = Describe("main", func() {
 
                        r, err := p.GetTokens()
 
-                       Expect(err).NotTo(HaveOccurred())
-                       
Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier))
-                       Expect(r).To(Equal(&TokenResult{
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       
gomega.Expect(c.GetTokensCalledIdentifier).To(gomega.Equal(p.identifier))
+                       gomega.Expect(r).To(gomega.Equal(&TokenResult{
                                AccessToken:  c.ReturnAccessToken,
                                RefreshToken: c.ReturnRefreshToken,
                        }))
                })
 
-               It("caches the tokens in the config provider", func() {
+               ginkgo.It("caches the tokens in the config provider", func() {
                        c := &mockConfigProvider{}
                        p := ConfigBackedCachingProvider{
                                identifier: "iamidentifier",
@@ -83,9 +83,9 @@ var _ = Describe("main", func() {
 
                        p.CacheTokens(toSave)
 
-                       Expect(c.SavedIdentifier).To(Equal(p.identifier))
-                       Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken))
-                       
Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken))
+                       
gomega.Expect(c.SavedIdentifier).To(gomega.Equal(p.identifier))
+                       
gomega.Expect(c.SavedAccessToken).To(gomega.Equal(toSave.AccessToken))
+                       
gomega.Expect(c.SavedRefreshToken).To(gomega.Equal(toSave.RefreshToken))
                })
        })
 })
diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go
index a238a484..deb09003 100644
--- a/oauth2/device_code_flow_test.go
+++ b/oauth2/device_code_flow_test.go
@@ -25,8 +25,8 @@ import (
        "github.com/apache/pulsar-client-go/oauth2/clock/testing"
        "golang.org/x/oauth2"
 
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
 type MockDeviceCodeProvider struct {
@@ -59,9 +59,9 @@ func (c *MockDeviceCodeCallback) Callback(code 
*DeviceCodeResult) error {
        return nil
 }
 
-var _ = Describe("DeviceCodeFlow", func() {
+var _ = ginkgo.Describe("DeviceCodeFlow", func() {
 
-       Describe("Authorize", func() {
+       ginkgo.Describe("Authorize", func() {
                const audience = "test_clientID"
 
                var mockClock clock.Clock
@@ -70,7 +70,7 @@ var _ = Describe("DeviceCodeFlow", func() {
                var mockCallback *MockDeviceCodeCallback
                var flow *DeviceCodeFlow
 
-               BeforeEach(func() {
+               ginkgo.BeforeEach(func() {
                        mockClock = testing.NewFakeClock(time.Unix(0, 0))
 
                        mockCodeProvider = &MockDeviceCodeProvider{
@@ -107,21 +107,21 @@ var _ = Describe("DeviceCodeFlow", func() {
                        )
                })
 
-               It("invokes DeviceCodeProvider", func() {
+               ginkgo.It("invokes DeviceCodeProvider", func() {
                        _, _ = flow.Authorize(audience)
-                       Expect(mockCodeProvider.Called).To(BeTrue())
-                       
Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access"))
+                       
gomega.Expect(mockCodeProvider.Called).To(gomega.BeTrue())
+                       
gomega.Expect(mockCodeProvider.CalledWithAdditionalScopes).To(gomega.ContainElement("offline_access"))
                })
 
-               It("invokes callback with returned code", func() {
+               ginkgo.It("invokes callback with returned code", func() {
                        _, _ = flow.Authorize(audience)
-                       Expect(mockCallback.Called).To(BeTrue())
-                       
Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult))
+                       gomega.Expect(mockCallback.Called).To(gomega.BeTrue())
+                       
gomega.Expect(mockCallback.DeviceCodeResult).To(gomega.Equal(mockCodeProvider.DeviceCodeResult))
                })
 
-               It("invokes TokenExchanger with returned code", func() {
+               ginkgo.It("invokes TokenExchanger with returned code", func() {
                        _, _ = flow.Authorize(audience)
-                       
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{
+                       
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&DeviceCodeExchangeRequest{
                                TokenEndpoint: oidcEndpoints.TokenEndpoint,
                                ClientID:      "test_clientID",
                                PollInterval:  time.Duration(5) * time.Second,
@@ -129,28 +129,28 @@ var _ = Describe("DeviceCodeFlow", func() {
                        }))
                })
 
-               It("returns an authorization grant", func() {
+               ginkgo.It("returns an authorization grant", func() {
                        grant, _ := flow.Authorize(audience)
-                       Expect(grant).ToNot(BeNil())
-                       Expect(grant.Audience).To(Equal(audience))
-                       Expect(grant.ClientID).To(Equal("test_clientID"))
-                       Expect(grant.ClientCredentials).To(BeNil())
-                       
Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+                       gomega.Expect(grant).ToNot(gomega.BeNil())
+                       gomega.Expect(grant.Audience).To(gomega.Equal(audience))
+                       
gomega.Expect(grant.ClientID).To(gomega.Equal("test_clientID"))
+                       
gomega.Expect(grant.ClientCredentials).To(gomega.BeNil())
+                       
gomega.Expect(grant.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint))
                        expected := 
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
-                       Expect(*grant.Token).To(Equal(expected))
+                       gomega.Expect(*grant.Token).To(gomega.Equal(expected))
                })
        })
 })
 
-var _ = Describe("DeviceAuthorizationGrantRefresher", func() {
+var _ = ginkgo.Describe("DeviceAuthorizationGrantRefresher", func() {
 
-       Describe("Refresh", func() {
+       ginkgo.Describe("Refresh", func() {
                var mockClock clock.Clock
                var mockTokenExchanger *MockTokenExchanger
                var refresher *DeviceAuthorizationGrantRefresher
                var grant *AuthorizationGrant
 
-               BeforeEach(func() {
+               ginkgo.BeforeEach(func() {
                        mockClock = testing.NewFakeClock(time.Unix(0, 0))
 
                        mockTokenExchanger = &MockTokenExchanger{}
@@ -169,62 +169,62 @@ var _ = Describe("DeviceAuthorizationGrantRefresher", 
func() {
                        }
                })
 
-               It("invokes the token exchanger", func() {
+               ginkgo.It("invokes the token exchanger", func() {
                        mockTokenExchanger.ReturnsTokens = &TokenResult{
                                AccessToken: "new token",
                        }
 
                        _, _ = refresher.Refresh(grant)
-                       
Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{
+                       
gomega.Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(gomega.Equal(RefreshTokenExchangeRequest{
                                TokenEndpoint: oidcEndpoints.TokenEndpoint,
                                ClientID:      grant.ClientID,
                                RefreshToken:  "grt",
                        }))
                })
 
-               It("returns the refreshed access token from the 
TokenExchanger", func() {
+               ginkgo.It("returns the refreshed access token from the 
TokenExchanger", func() {
                        mockTokenExchanger.ReturnsTokens = &TokenResult{
                                AccessToken: "new token",
                        }
 
                        grant, _ = refresher.Refresh(grant)
-                       
Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
+                       
gomega.Expect(grant.Token.AccessToken).To(gomega.Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
                })
 
-               It("preserves the existing refresh token from the 
TokenExchanger", func() {
+               ginkgo.It("preserves the existing refresh token from the 
TokenExchanger", func() {
                        mockTokenExchanger.ReturnsTokens = &TokenResult{
                                AccessToken: "new token",
                        }
 
                        grant, _ = refresher.Refresh(grant)
-                       Expect(grant.Token.RefreshToken).To(Equal("grt"))
+                       
gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("grt"))
                })
 
-               It("returns the refreshed refresh token from the 
TokenExchanger", func() {
+               ginkgo.It("returns the refreshed refresh token from the 
TokenExchanger", func() {
                        mockTokenExchanger.ReturnsTokens = &TokenResult{
                                AccessToken:  "new token",
                                RefreshToken: "new token",
                        }
 
                        grant, _ = refresher.Refresh(grant)
-                       Expect(grant.Token.RefreshToken).To(Equal("new token"))
+                       
gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("new token"))
                })
 
-               It("returns a meaningful expiration time", func() {
+               ginkgo.It("returns a meaningful expiration time", func() {
                        mockTokenExchanger.ReturnsTokens = &TokenResult{
                                AccessToken: "new token",
                                ExpiresIn:   60,
                        }
 
                        grant, _ = refresher.Refresh(grant)
-                       
Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) * 
time.Second)))
+                       
gomega.Expect(grant.Token.Expiry).To(gomega.Equal(mockClock.Now().Add(time.Duration(60)
 * time.Second)))
                })
 
-               It("returns an error when TokenExchanger does", func() {
+               ginkgo.It("returns an error when TokenExchanger does", func() {
                        mockTokenExchanger.ReturnsError = 
errors.New("someerror")
 
                        _, err := refresher.Refresh(grant)
-                       Expect(err.Error()).To(Equal("could not exchange 
refresh token: someerror"))
+                       gomega.Expect(err.Error()).To(gomega.Equal("could not 
exchange refresh token: someerror"))
                })
        })
 })
diff --git a/oauth2/oidc_endpoint_provider_test.go 
b/oauth2/oidc_endpoint_provider_test.go
index 065f9277..307f0a9e 100644
--- a/oauth2/oidc_endpoint_provider_test.go
+++ b/oauth2/oidc_endpoint_provider_test.go
@@ -22,17 +22,17 @@ import (
        "net/http"
        "net/http/httptest"
 
-       . "github.com/onsi/ginkgo"
-       . "github.com/onsi/gomega"
+       "github.com/onsi/ginkgo"
+       "github.com/onsi/gomega"
 )
 
-var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
-       It("calls and gets the well known data from the correct endpoint for 
the issuer", func() {
+var _ = ginkgo.Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
+       ginkgo.It("calls and gets the well known data from the correct endpoint 
for the issuer", func() {
                var req *http.Request
                wkEndpointsResp := OIDCWellKnownEndpoints{
                        AuthorizationEndpoint: "the-auth-endpoint", 
TokenEndpoint: "the-token-endpoint"}
                responseBytes, err := json.Marshal(wkEndpointsResp)
-               Expect(err).ToNot(HaveOccurred())
+               gomega.Expect(err).ToNot(gomega.HaveOccurred())
 
                ts := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
                        req = r
@@ -45,31 +45,31 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", 
func() {
 
                endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
 
-               Expect(err).ToNot(HaveOccurred())
-               Expect(*endpoints).To(Equal(wkEndpointsResp))
-               
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+               gomega.Expect(err).ToNot(gomega.HaveOccurred())
+               gomega.Expect(*endpoints).To(gomega.Equal(wkEndpointsResp))
+               
gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration"))
        })
 
-       It("errors when url.Parse errors", func() {
+       ginkgo.It("errors when url.Parse errors", func() {
                endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://")
 
-               Expect(err).To(HaveOccurred())
-               Expect(err.Error()).To(Equal(
+               gomega.Expect(err).To(gomega.HaveOccurred())
+               gomega.Expect(err.Error()).To(gomega.Equal(
                        "could not parse issuer url to build well known 
endpoints: parse \"://\": missing protocol scheme"))
-               Expect(endpoints).To(BeNil())
+               gomega.Expect(endpoints).To(gomega.BeNil())
        })
 
-       It("errors when the get errors", func() {
+       ginkgo.It("errors when the get errors", func() {
                endpoints, err := 
GetOIDCWellKnownEndpointsFromIssuerURL("https://";)
 
-               Expect(err).To(HaveOccurred())
-               Expect(err.Error()).To(Equal(
+               gomega.Expect(err).To(gomega.HaveOccurred())
+               gomega.Expect(err.Error()).To(gomega.Equal(
                        "could not get well known endpoints from url 
https://.well-known/openid-configuration: " +
                                "Get 
\"https://.well-known/openid-configuration\": dial tcp: lookup .well-known: no 
such host"))
-               Expect(endpoints).To(BeNil())
+               gomega.Expect(endpoints).To(gomega.BeNil())
        })
 
-       It("errors when the json decoder errors", func() {
+       ginkgo.It("errors when the json decoder errors", func() {
                var req *http.Request
 
                ts := httptest.NewServer(http.HandlerFunc(func(w 
http.ResponseWriter, r *http.Request) {
@@ -83,10 +83,10 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", 
func() {
 
                endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
 
-               Expect(err).To(HaveOccurred())
-               Expect(err.Error()).To(Equal("could not decode json body when 
getting well" +
+               gomega.Expect(err).To(gomega.HaveOccurred())
+               gomega.Expect(err.Error()).To(gomega.Equal("could not decode 
json body when getting well" +
                        " known endpoints: invalid character '<' looking for 
beginning of value"))
-               Expect(endpoints).To(BeNil())
-               
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+               gomega.Expect(endpoints).To(gomega.BeNil())
+               
gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration"))
        })
 })
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 2172af53..5001d8d1 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -46,7 +46,7 @@ func newConsumerCommand() *cobra.Command {
                Use:   "consume <topic>",
                Short: "Consume from topic",
                Args:  cobra.ExactArgs(1),
-               Run: func(cmd *cobra.Command, args []string) {
+               Run: func(_ *cobra.Command, args []string) {
                        stop := stopCh()
                        if FlagProfile {
                                RunProfiling(stop)
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index f8e14d22..da0a499e 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -49,7 +49,7 @@ func newProducerCommand() *cobra.Command {
                Use:   "produce ",
                Short: "Produce on a topic and measure performance",
                Args:  cobra.ExactArgs(1),
-               Run: func(cmd *cobra.Command, args []string) {
+               Run: func(_ *cobra.Command, args []string) {
                        stop := stopCh()
                        if FlagProfile {
                                RunProfiling(stop)
@@ -125,7 +125,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan 
struct{}) {
 
                        producer.SendAsync(ctx, &pulsar.ProducerMessage{
                                Payload: payload,
-                       }, func(msgID pulsar.MessageID, message 
*pulsar.ProducerMessage, e error) {
+                       }, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, 
e error) {
                                if e != nil {
                                        log.WithError(e).Fatal("Failed to 
publish")
                                }
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 40257e83..701ef412 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -88,7 +88,7 @@ func initLogger(debug bool) {
 
 func main() {
        rootCmd := &cobra.Command{
-               PersistentPreRun: func(cmd *cobra.Command, args []string) {
+               PersistentPreRun: func(_ *cobra.Command, _ []string) {
                        initLogger(flagDebug)
                },
                Use: "pulsar-perf-go",
diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index 97d9e05a..c4ecc003 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -97,7 +97,7 @@ func (i *immediateAckGroupingTracker) addCumulative(id 
MessageID) {
        i.ackCumulative(id)
 }
 
-func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool {
+func (i *immediateAckGroupingTracker) isDuplicate(_ MessageID) bool {
        return false
 }
 
diff --git a/pulsar/ack_grouping_tracker_test.go 
b/pulsar/ack_grouping_tracker_test.go
index 1fe5a568..57adf888 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -190,8 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
 }
 
 func TestTimedTrackerIsDuplicate(t *testing.T) {
-       tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id 
MessageID) {},
-               func(id []*pb.MessageIdData) {})
+       tracker := newAckGroupingTracker(nil, func(_ MessageID) {}, func(_ 
MessageID) {},
+               func(_ []*pb.MessageIdData) {})
 
        tracker.add(&messageID{batchIdx: 0, batchSize: 3})
        tracker.add(&messageID{batchIdx: 2, batchSize: 3})
diff --git a/pulsar/auth/athenz_test.go b/pulsar/auth/athenz_test.go
index 89118081..a4d1fe5d 100644
--- a/pulsar/auth/athenz_test.go
+++ b/pulsar/auth/athenz_test.go
@@ -48,13 +48,13 @@ type MockRoleToken struct {
        mock.Mock
 }
 
-func (m *MockTokenBuilder) SetExpiration(t time.Duration) {
+func (m *MockTokenBuilder) SetExpiration(_ time.Duration) {
 }
-func (m *MockTokenBuilder) SetHostname(h string) {
+func (m *MockTokenBuilder) SetHostname(_ string) {
 }
-func (m *MockTokenBuilder) SetIPAddress(ip string) {
+func (m *MockTokenBuilder) SetIPAddress(_ string) {
 }
-func (m *MockTokenBuilder) SetKeyService(keyService string) {
+func (m *MockTokenBuilder) SetKeyService(_ string) {
 }
 func (m *MockTokenBuilder) Token() zms.Token {
        result := m.Called()
diff --git a/pulsar/auth/disabled.go b/pulsar/auth/disabled.go
index 0389a39b..4614cbdf 100644
--- a/pulsar/auth/disabled.go
+++ b/pulsar/auth/disabled.go
@@ -49,7 +49,7 @@ func (disabled) Close() error {
        return nil
 }
 
-func (d disabled) RoundTrip(req *http.Request) (*http.Response, error) {
+func (d disabled) RoundTrip(_ *http.Request) (*http.Response, error) {
        return nil, nil
 }
 
@@ -57,6 +57,6 @@ func (d disabled) Transport() http.RoundTripper {
        return nil
 }
 
-func (d disabled) WithTransport(tripper http.RoundTripper) error {
+func (d disabled) WithTransport(_ http.RoundTripper) error {
        return nil
 }
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index 86d36404..cc3d11c1 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -35,7 +35,7 @@ func mockOAuthServer() *httptest.Server {
 
        // mock the used REST path for the tests
        mockedHandler := http.NewServeMux()
-       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, _ *http.Request) {
                s := fmt.Sprintf(`{
     "issuer":"%s",
     "authorization_endpoint":"%s/authorize",
@@ -44,10 +44,10 @@ func mockOAuthServer() *httptest.Server {
 }`, server.URL, server.URL, server.URL, server.URL)
                fmt.Fprintln(writer, s)
        })
-       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, _ *http.Request) {
                fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
        })
-       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
request *http.Request) {
+       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
                fmt.Fprintln(writer, "true")
        })
 
diff --git a/pulsar/auth/token.go b/pulsar/auth/token.go
index 898b6537..1d7fcff1 100644
--- a/pulsar/auth/token.go
+++ b/pulsar/auth/token.go
@@ -38,9 +38,8 @@ func NewAuthenticationTokenWithParams(params 
map[string]string) (Provider, error
                return NewAuthenticationToken(params["token"]), nil
        } else if params["file"] != "" {
                return NewAuthenticationTokenFromFile(params["file"]), nil
-       } else {
-               return nil, errors.New("missing configuration for token auth")
        }
+       return nil, errors.New("missing configuration for token auth")
 }
 
 // NewAuthenticationToken returns a token auth provider that will use the 
specified token to
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 5b6b8f1f..0fdf6be7 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -257,7 +257,7 @@ func mockOAuthServer() *httptest.Server {
 
        // mock the used REST path for the tests
        mockedHandler := http.NewServeMux()
-       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, _ *http.Request) {
                s := fmt.Sprintf(`{
     "issuer":"%s",
     "authorization_endpoint":"%s/authorize",
@@ -266,13 +266,13 @@ func mockOAuthServer() *httptest.Server {
 }`, server.URL, server.URL, server.URL, server.URL)
                fmt.Fprintln(writer, s)
        })
-       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, _ *http.Request) {
                fmt.Fprintln(writer, "{\n"+
                        "  \"access_token\": 
\"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+
                        "tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+
                        "  \"token_type\": \"Bearer\"\n}")
        })
-       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
request *http.Request) {
+       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
                fmt.Fprintln(writer, "true")
        })
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index a3d3e3ff..92376ac1 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "context"
+       "errors"
        "fmt"
        "math/rand"
        "strconv"
@@ -492,7 +493,7 @@ func (c *consumer) unsubscribe(force bool) error {
                }
        }
        if errMsg != "" {
-               return fmt.Errorf(errMsg)
+               return errors.New(errMsg)
        }
        return nil
 }
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 020e38a0..3030bda3 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -294,11 +294,11 @@ func (c *multiTopicConsumer) Close() {
        })
 }
 
-func (c *multiTopicConsumer) Seek(msgID MessageID) error {
+func (c *multiTopicConsumer) Seek(_ MessageID) error {
        return newError(SeekFailed, "seek command not allowed for multi topic 
consumer")
 }
 
-func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
+func (c *multiTopicConsumer) SeekByTime(_ time.Time) error {
        return newError(SeekFailed, "seek command not allowed for multi topic 
consumer")
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 831f763a..0e8274e6 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -786,18 +786,18 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
        pc.metrics.NacksCounter.Inc()
 }
 
-func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+func (pc *partitionConsumer) Redeliver(msgIDs []messageID) {
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
                pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
                return
        }
-       pc.eventsCh <- &redeliveryRequest{msgIds}
+       pc.eventsCh <- &redeliveryRequest{msgIDs}
 
-       iMsgIds := make([]MessageID, len(msgIds))
-       for i := range iMsgIds {
-               iMsgIds[i] = &msgIds[i]
+       iMsgIDs := make([]MessageID, len(msgIDs))
+       for i := range iMsgIDs {
+               iMsgIDs[i] = &msgIDs[i]
        }
-       pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
+       pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIDs)
 }
 
 func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
@@ -805,14 +805,14 @@ func (pc *partitionConsumer) internalRedeliver(req 
*redeliveryRequest) {
                pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
                return
        }
-       msgIds := req.msgIds
-       pc.log.Debug("Request redelivery after negative ack for messages", 
msgIds)
+       msgIDs := req.msgIDs
+       pc.log.Debug("Request redelivery after negative ack for messages", 
msgIDs)
 
-       msgIDDataList := make([]*pb.MessageIdData, len(msgIds))
-       for i := 0; i < len(msgIds); i++ {
+       msgIDDataList := make([]*pb.MessageIdData, len(msgIDs))
+       for i := 0; i < len(msgIDs); i++ {
                msgIDDataList[i] = &pb.MessageIdData{
-                       LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
-                       EntryId:  proto.Uint64(uint64(msgIds[i].entryID)),
+                       LedgerId: proto.Uint64(uint64(msgIDs[i].ledgerID)),
+                       EntryId:  proto.Uint64(uint64(msgIDs[i].entryID)),
                }
        }
 
@@ -1560,7 +1560,7 @@ type closeRequest struct {
 }
 
 type redeliveryRequest struct {
-       msgIds []messageID
+       msgIDs []messageID
 }
 
 type getLastMsgIDRequest struct {
@@ -1906,10 +1906,9 @@ func (pc *partitionConsumer) clearReceiverQueue() 
*trackingMessageID {
                // If the queue was empty we need to restart from the message 
just after the last one that has been dequeued
                // in the past
                return pc.lastDequeuedMsg
-       } else {
-               // No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
-               return pc.startMessageID.get()
        }
+       // No message was received or dequeued by this consumer. Next message 
would still be the startMessageId
+       return pc.startMessageID.get()
 }
 
 func getPreviousMessage(mid *trackingMessageID) *trackingMessageID {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a4a8d995..37cc9632 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -982,7 +982,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
                wg.Add(1)
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
-                       func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                       func(_ MessageID, _ *ProducerMessage, e error) {
                                assert.NoError(t, e)
                                wg.Done()
                        })
@@ -998,7 +998,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
                wg.Add(1)
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
-                       func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                       func(_ MessageID, _ *ProducerMessage, e error) {
                                assert.NoError(t, e)
                                wg.Done()
                        })
@@ -2301,7 +2301,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topic,
-               MessageRouter: func(msg *ProducerMessage, topicMetadata 
TopicMetadata) int {
+               MessageRouter: func(msg *ProducerMessage, _ TopicMetadata) int {
                        // The message key will contain the partition id where 
to route
                        i, err := strconv.Atoi(msg.Key)
                        assert.NoError(t, err)
@@ -2422,11 +2422,11 @@ func TestProducerName(t *testing.T) {
 
 type noopConsumerInterceptor struct{}
 
-func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+func (noopConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {}
 
-func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+func (noopConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
 
-func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+func (noopConsumerInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
 
 // copyPropertyInterceptor copy all keys in message properties map and add a 
suffix
 type copyPropertyInterceptor struct {
@@ -2435,31 +2435,31 @@ type copyPropertyInterceptor struct {
 
 func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
        properties := message.Properties()
-       copy := make(map[string]string, len(properties))
+       cp := make(map[string]string, len(properties))
        for k, v := range properties {
-               copy[k+x.suffix] = v
+               cp[k+x.suffix] = v
        }
-       for ck, v := range copy {
+       for ck, v := range cp {
                properties[ck] = v
        }
 }
 
-func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {}
+func (copyPropertyInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
 
-func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs 
[]MessageID) {}
+func (copyPropertyInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
 
 type metricConsumerInterceptor struct {
        ackn  int32
        nackn int32
 }
 
-func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+func (x *metricConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {}
 
-func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID 
MessageID) {
+func (x *metricConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {
        atomic.AddInt32(&x.ackn, 1)
 }
 
-func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, 
msgIDs []MessageID) {
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(_ Consumer, msgIDs 
[]MessageID) {
        atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
 }
 
@@ -2512,7 +2512,7 @@ func TestConsumerWithInterceptors(t *testing.T) {
                }
        }
 
-       var nackIds []MessageID
+       var nackIDs []MessageID
        // receive 10 messages
        for i := 0; i < 10; i++ {
                msg, err := consumer.Receive(context.Background())
@@ -2533,13 +2533,13 @@ func TestConsumerWithInterceptors(t *testing.T) {
                if i%2 == 0 {
                        consumer.Ack(msg)
                } else {
-                       nackIds = append(nackIds, msg.ID())
+                       nackIDs = append(nackIDs, msg.ID())
                }
        }
        assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
 
-       for i := range nackIds {
-               consumer.NackID(nackIds[i])
+       for i := range nackIDs {
+               consumer.NackID(nackIDs[i])
        }
 
        // receive 5 nack messages
@@ -2633,7 +2633,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t 
*testing.T) {
                        producer.SendAsync(ctx, &ProducerMessage{
                                Key:     k,
                                Payload: []byte(fmt.Sprintf("value-%d", i)),
-                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       }, func(_ MessageID, _ *ProducerMessage, err error) {
                                assert.Nil(t, err)
                        },
                        )
@@ -2728,7 +2728,7 @@ func 
TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
                        producer.SendAsync(ctx, &ProducerMessage{
                                Key:     k,
                                Payload: []byte(fmt.Sprintf("value-%d", i)),
-                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       }, func(_ MessageID, _ *ProducerMessage, err error) {
                                assert.Nil(t, err)
                        },
                        )
@@ -2762,7 +2762,7 @@ func 
TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
                                Key:         u.String(),
                                OrderingKey: k,
                                Payload:     []byte(fmt.Sprintf("value-%d", i)),
-                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       }, func(_ MessageID, _ *ProducerMessage, err error) {
                                assert.Nil(t, err)
                        },
                        )
@@ -3511,12 +3511,12 @@ func NewEncKeyReader(publicKeyPath, privateKeyPath 
string) *EncKeyReader {
 }
 
 // GetPublicKey read public key from the given path
-func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
+func (d *EncKeyReader) PublicKey(keyName string, _ map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
        return readKey(keyName, d.publicKeyPath, d.metaMap)
 }
 
 // GetPrivateKey read private key from the given path
-func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
+func (d *EncKeyReader) PrivateKey(keyName string, _ map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
        return readKey(keyName, d.privateKeyPath, d.metaMap)
 }
 
@@ -4022,31 +4022,31 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool, o
        for i := 0; i < BatchingMaxSize; i++ {
                producer.SendAsync(context.Background(), &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("msg-%d", i)),
-               }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
+               }, func(id MessageID, _ *ProducerMessage, err error) {
                        assert.Nil(t, err)
                        log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), 
id.BatchSize())
                })
        }
        assert.Nil(t, producer.FlushWithCtx(context.Background()))
 
-       msgIds := make([]MessageID, BatchingMaxSize)
+       msgIDs := make([]MessageID, BatchingMaxSize)
        for i := 0; i < BatchingMaxSize; i++ {
                message, err := consumer.Receive(context.Background())
                assert.Nil(t, err)
-               msgIds[i] = message.ID()
+               msgIDs[i] = message.ID()
                log.Printf("Received %v from %v:%d:%d", 
string(message.Payload()), message.ID(),
                        message.ID().BatchIdx(), message.ID().BatchSize())
        }
 
        // Acknowledge half of the messages
        if cumulative {
-               msgID := msgIds[BatchingMaxSize/2-1]
+               msgID := msgIDs[BatchingMaxSize/2-1]
                err := consumer.AckIDCumulative(msgID)
                assert.Nil(t, err)
                log.Printf("Acknowledge %v:%d cumulatively\n", msgID, 
msgID.BatchIdx())
        } else {
                for i := 0; i < BatchingMaxSize; i++ {
-                       msgID := msgIds[i]
+                       msgID := msgIDs[i]
                        if i%2 == 0 {
                                consumer.AckID(msgID)
                                log.Printf("Acknowledge %v:%d\n", msgID, 
msgID.BatchIdx())
@@ -4066,17 +4066,17 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool, o
                        index = i + BatchingMaxSize/2
                }
                assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)), 
message.Payload())
-               assert.Equal(t, msgIds[index].BatchIdx(), 
message.ID().BatchIdx())
+               assert.Equal(t, msgIDs[index].BatchIdx(), 
message.ID().BatchIdx())
                // We should not acknowledge message.ID() here because 
message.ID() shares a different
                // tracker with msgIds
                if !cumulative {
-                       msgID := msgIds[index]
+                       msgID := msgIDs[index]
                        consumer.AckID(msgID)
                        log.Printf("Acknowledge %v:%d\n", msgID, 
msgID.BatchIdx())
                }
        }
        if cumulative {
-               msgID := msgIds[BatchingMaxSize-1]
+               msgID := msgIDs[BatchingMaxSize-1]
                err := consumer.AckIDCumulative(msgID)
                assert.Nil(t, err)
                log.Printf("Acknowledge %v:%d cumulatively\n", msgID, 
msgID.BatchIdx())
@@ -4167,7 +4167,7 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T) 
{
                p.SendAsync(
                        context.Background(),
                        &ProducerMessage{Payload: []byte("hello")},
-                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       func(_ MessageID, _ *ProducerMessage, _ error) {
                        },
                )
        }
@@ -4333,7 +4333,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
                p1.SendAsync(
                        context.Background(),
                        &ProducerMessage{Payload: createTestMessagePayload(1)},
-                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       func(_ MessageID, _ *ProducerMessage, _ error) {
                        },
                )
        }
@@ -4373,7 +4373,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
                p1.SendAsync(
                        context.Background(),
                        &ProducerMessage{Payload: createTestMessagePayload(1)},
-                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       func(_ MessageID, _ *ProducerMessage, _ error) {
                        },
                )
        }
@@ -4456,7 +4456,7 @@ func TestMultiConsumerMemoryLimit(t *testing.T) {
                p1.SendAsync(
                        context.Background(),
                        &ProducerMessage{Payload: createTestMessagePayload(1)},
-                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       func(_ MessageID, _ *ProducerMessage, _ error) {
                        },
                )
        }
diff --git a/pulsar/crypto/default_message_crypto.go 
b/pulsar/crypto/default_message_crypto.go
index 74e4592d..2239fa39 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -169,7 +169,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
                        // we should never reach here
                        msg := fmt.Sprintf("%v Failed to find encrypted Data 
key for key %v", d.logCtx, keyName)
                        d.logger.Errorf(msg)
-                       return nil, fmt.Errorf(msg)
+                       return nil, errors.New(msg)
                }
 
        }
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6b13b329..00c7e03b 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -126,7 +126,7 @@ func (r *dlqRouter) run() {
                                Properties:          properties,
                                EventTime:           msg.EventTime(),
                                ReplicationClusters: msg.replicationClusters,
-                       }, func(messageID MessageID, producerMessage 
*ProducerMessage, err error) {
+                       }, func(_ MessageID, _ *ProducerMessage, err error) {
                                if err == nil {
                                        r.log.WithField("msgID", 
msgID).Debug("Succeed to send message to DLQ")
                                        // The Producer ack might be coming 
from the connection go-routine that
@@ -185,9 +185,8 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
                        r.log.WithError(err).Error("Failed to create DLQ 
producer")
                        time.Sleep(bo.Next())
                        continue
-               } else {
-                       r.producer = producer
-                       return producer
                }
+               r.producer = producer
+               return producer
        }
 }
diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go
index 426855b2..1cb3f6cb 100644
--- a/pulsar/helper_for_test.go
+++ b/pulsar/helper_for_test.go
@@ -70,7 +70,7 @@ func httpDelete(requestPaths ...string) error {
        var errs error
        for _, requestPath := range requestPaths {
                if err := httpDo(http.MethodDelete, requestPath, nil, nil); err 
!= nil {
-                       errs = pkgerrors.Wrapf(err, "unable to delete url: 
%s"+requestPath)
+                       errs = pkgerrors.Wrapf(err, "unable to delete url: %s", 
requestPath)
                }
        }
        return errs
@@ -186,5 +186,5 @@ func retryAssert(t assert.TestingT, times int, milliseconds 
int, update func(),
 
 type fakeAssertT struct{}
 
-func (fa fakeAssertT) Errorf(format string, args ...interface{}) {
+func (fa fakeAssertT) Errorf(_ string, _ ...interface{}) {
 }
diff --git a/pulsar/internal/channel_cond_test.go 
b/pulsar/internal/channel_cond_test.go
index a73d44ea..93a04084 100644
--- a/pulsar/internal/channel_cond_test.go
+++ b/pulsar/internal/channel_cond_test.go
@@ -24,7 +24,7 @@ import (
        "time"
 )
 
-func TestChCond(t *testing.T) {
+func TestChCond(_ *testing.T) {
        cond := newCond(&sync.Mutex{})
        wg := sync.WaitGroup{}
        wg.Add(1)
@@ -39,7 +39,7 @@ func TestChCond(t *testing.T) {
        wg.Wait()
 }
 
-func TestChCondWithContext(t *testing.T) {
+func TestChCondWithContext(_ *testing.T) {
        cond := newCond(&sync.Mutex{})
        wg := sync.WaitGroup{}
        ctx, cancel := context.WithCancel(context.Background())
diff --git a/pulsar/internal/compression/noop.go 
b/pulsar/internal/compression/noop.go
index 78acb52d..1607c85f 100644
--- a/pulsar/internal/compression/noop.go
+++ b/pulsar/internal/compression/noop.go
@@ -42,7 +42,7 @@ func (noopProvider) Compress(dst, src []byte) []byte {
        return dst[:len(src)]
 }
 
-func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte, 
error) {
+func (noopProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
        if dst == nil {
                dst = make([]byte, len(src))
        }
diff --git a/pulsar/internal/compression/zstd_cgo.go 
b/pulsar/internal/compression/zstd_cgo.go
index dde54ae2..8cad7827 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -75,7 +75,7 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
        return out
 }
 
-func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) 
([]byte, error) {
+func (z *zstdCGoProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
        ctx := z.ctxPool.Get().(zstd.Ctx)
        defer z.ctxPool.Put(ctx)
        return ctx.Decompress(dst, src)
diff --git a/pulsar/internal/compression/zstd_go.go 
b/pulsar/internal/compression/zstd_go.go
index ae850783..77389103 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -58,7 +58,7 @@ func (p *zstdProvider) Compress(dst, src []byte) []byte {
        return p.encoder.EncodeAll(src, dst)
 }
 
-func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte, 
error) {
+func (p *zstdProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
        return p.decoder.DecodeAll(src, dst)
 }
 
diff --git a/pulsar/internal/crypto/consumer_decryptor.go 
b/pulsar/internal/crypto/consumer_decryptor.go
index bbc1f9b1..8db89027 100644
--- a/pulsar/internal/crypto/consumer_decryptor.go
+++ b/pulsar/internal/crypto/consumer_decryptor.go
@@ -42,7 +42,7 @@ func NewConsumerDecryptor(keyReader crypto.KeyReader,
 }
 
 func (d *consumerDecryptor) Decrypt(payload []byte,
-       msgID *pb.MessageIdData,
+       _ *pb.MessageIdData,
        msgMetadata *pb.MessageMetadata) ([]byte, error) {
        // encryption keys are not present in message metadta, no need decrypt 
the payload
        if len(msgMetadata.GetEncryptionKeys()) == 0 {
diff --git a/pulsar/internal/crypto/noop_decryptor.go 
b/pulsar/internal/crypto/noop_decryptor.go
index c049c472..166b1541 100644
--- a/pulsar/internal/crypto/noop_decryptor.go
+++ b/pulsar/internal/crypto/noop_decryptor.go
@@ -31,7 +31,7 @@ func NewNoopDecryptor() Decryptor {
 
 // Decrypt noop decryptor
 func (d *noopDecryptor) Decrypt(payload []byte,
-       msgID *pb.MessageIdData,
+       _ *pb.MessageIdData,
        msgMetadata *pb.MessageMetadata) ([]byte, error) {
        if len(msgMetadata.GetEncryptionKeys()) > 0 {
                return payload, fmt.Errorf("incoming message payload is 
encrypted, consumer is not configured to decrypt")
diff --git a/pulsar/internal/crypto/noop_encryptor.go 
b/pulsar/internal/crypto/noop_encryptor.go
index 4512e7bd..7c52755f 100644
--- a/pulsar/internal/crypto/noop_encryptor.go
+++ b/pulsar/internal/crypto/noop_encryptor.go
@@ -28,6 +28,6 @@ func NewNoopEncryptor() Encryptor {
 }
 
 // Encrypt Noop ecryptor
-func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) 
([]byte, error) {
+func (e *noopEncryptor) Encrypt(data []byte, _ *pb.MessageMetadata) ([]byte, 
error) {
        return data, nil
 }
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index f1420aea..632d5a4f 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -159,9 +159,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, 
params map[string]str
                        c.log.Debugf("Retrying httpRequest in {%v} with timeout 
in {%v}", retryTime, c.requestTimeout)
                        time.Sleep(retryTime)
                        _, err = c.GetWithQueryParams(endpoint, obj, params, 
true)
-                       if _, ok := err.(*url.Error); ok {
-                               continue
-                       } else {
+                       if _, ok := err.(*url.Error); !ok {
                                // We either succeeded or encountered a non 
connection error
                                break
                        }
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 81f496b3..6c07e7d2 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -367,7 +367,7 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace 
string, mode GetTopic
        return topics, nil
 }
 
-func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) 
(schema *pb.Schema, err error) {
+func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema, 
err error) {
        return nil, errors.New("GetSchema is not supported by 
httpLookupService")
 }
 
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 349d1929..fd08898a 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -57,7 +57,7 @@ func (c *mockedLookupRPCClient) NewConsumerID() uint64 {
        return 1
 }
 
-func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestToAnyBroker(_ uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
 
@@ -77,16 +77,16 @@ func (c *mockedLookupRPCClient) 
RequestToAnyBroker(requestID uint64, cmdType pb.
        }, nil
 }
 
-func (c *mockedLookupRPCClient) RequestToHost(serviceNameResolver 
*ServiceNameResolver, requestID uint64,
+func (c *mockedLookupRPCClient) RequestToHost(_ *ServiceNameResolver, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        return c.RequestToAnyBroker(requestID, cmdType, message)
 }
 
-func (c *mockedLookupRPCClient) LookupService(URL string) LookupService {
+func (c *mockedLookupRPCClient) LookupService(_ string) LookupService {
        return nil
 }
 
-func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr 
*url.URL, requestID uint64,
+func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr 
*url.URL, _ uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
        expectedRequest := &c.expectedRequests[0]
@@ -108,14 +108,14 @@ func (c *mockedLookupRPCClient) Request(logicalAddr 
*url.URL, physicalAddr *url.
        }, nil
 }
 
-func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
-       message proto.Message) (*RPCResult, error) {
+func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _ 
pb.BaseCommand_Type,
+       _ proto.Message) (*RPCResult, error) {
        assert.Fail(c.t, "Shouldn't be called")
        return nil, nil
 }
 
-func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type,
-       message proto.Message) error {
+func (c *mockedLookupRPCClient) RequestOnCnxNoWait(_ Connection, _ 
pb.BaseCommand_Type,
+       _ proto.Message) error {
        assert.Fail(c.t, "Shouldn't be called")
        return nil
 }
@@ -458,7 +458,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) 
NewConsumerID() uint64 {
 }
 
 func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID 
uint64, cmdType pb.BaseCommand_Type,
-       message proto.Message) (*RPCResult, error) {
+       _ proto.Message) (*RPCResult, error) {
        assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA)
 
        expectedRequest := &m.expectedRequests[0]
@@ -477,30 +477,30 @@ func (m mockedPartitionedTopicMetadataRPCClient) 
RequestToAnyBroker(requestID ui
        }, nil
 }
 
-func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+func (m mockedPartitionedTopicMetadataRPCClient) Request(_ *url.URL, _ 
*url.URL, _ uint64,
+       _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
        assert.Fail(m.t, "Shouldn't be called")
        return nil, nil
 }
 
-func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx 
Connection, cmdType pb.BaseCommand_Type,
-       message proto.Message) error {
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_ 
Connection, _ pb.BaseCommand_Type,
+       _ proto.Message) error {
        assert.Fail(m.t, "Shouldn't be called")
        return nil
 }
 
-func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, 
requestID uint64,
-       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(_ Connection, _ 
uint64,
+       _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
        assert.Fail(m.t, "Shouldn't be called")
        return nil, nil
 }
 
-func (m *mockedPartitionedTopicMetadataRPCClient) 
RequestToHost(serviceNameResolver *ServiceNameResolver,
+func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(_ 
*ServiceNameResolver,
        requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error) {
        return m.RequestToAnyBroker(requestID, cmdType, message)
 }
 
-func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(URL string) 
LookupService {
+func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string) 
LookupService {
        return nil
 }
 
@@ -575,7 +575,7 @@ type MockHTTPClient struct {
 
 func (c *MockHTTPClient) Close() {}
 
-func (c *MockHTTPClient) Get(endpoint string, obj interface{}, params 
map[string]string) error {
+func (c *MockHTTPClient) Get(endpoint string, obj interface{}, _ 
map[string]string) error {
        if strings.Contains(endpoint, HTTPLookupServiceBasePathV1) || 
strings.Contains(endpoint,
                HTTPLookupServiceBasePathV2) {
                return mockHTTPGetLookupResult(obj)
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go 
b/pulsar/internal/pulsartracing/consumer_interceptor.go
index 3969d45d..3b91e7cb 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor.go
@@ -33,9 +33,9 @@ func (t *ConsumerInterceptor) BeforeConsume(message 
pulsar.ConsumerMessage) {
        buildAndInjectChildSpan(message).Finish()
 }
 
-func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID 
pulsar.MessageID) {}
+func (t *ConsumerInterceptor) OnAcknowledge(_ pulsar.Consumer, _ 
pulsar.MessageID) {}
 
-func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, 
msgIDs []pulsar.MessageID) {
+func (t *ConsumerInterceptor) OnNegativeAcksSend(_ pulsar.Consumer, _ 
[]pulsar.MessageID) {
 }
 
 func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go 
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
index 05953cf1..1fa1bf0d 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -52,7 +52,7 @@ func (c *mockConsumer) Subscription() string {
        return ""
 }
 
-func (c *mockConsumer) AckWithTxn(msg pulsar.Message, txn pulsar.Transaction) 
error {
+func (c *mockConsumer) AckWithTxn(_ pulsar.Message, _ pulsar.Transaction) 
error {
        return nil
 }
 
@@ -63,7 +63,7 @@ func (c *mockConsumer) UnsubscribeForce() error {
        return nil
 }
 
-func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, 
err error) {
+func (c *mockConsumer) Receive(_ context.Context) (message pulsar.Message, err 
error) {
        return nil, nil
 }
 
@@ -71,39 +71,39 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage 
{
        return nil
 }
 
-func (c *mockConsumer) Ack(msg pulsar.Message) error {
+func (c *mockConsumer) Ack(_ pulsar.Message) error {
        return nil
 }
 
-func (c *mockConsumer) AckID(msgID pulsar.MessageID) error {
+func (c *mockConsumer) AckID(_ pulsar.MessageID) error {
        return nil
 }
 
-func (c *mockConsumer) AckCumulative(msg pulsar.Message) error {
+func (c *mockConsumer) AckCumulative(_ pulsar.Message) error {
        return nil
 }
 
-func (c *mockConsumer) AckIDCumulative(msgID pulsar.MessageID) error {
+func (c *mockConsumer) AckIDCumulative(_ pulsar.MessageID) error {
        return nil
 }
 
-func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) 
{}
+func (c *mockConsumer) ReconsumeLater(_ pulsar.Message, _ time.Duration) {}
 
-func (c *mockConsumer) ReconsumeLaterWithCustomProperties(msg pulsar.Message, 
customProperties map[string]string,
-       delay time.Duration) {
+func (c *mockConsumer) ReconsumeLaterWithCustomProperties(_ pulsar.Message, _ 
map[string]string,
+       _ time.Duration) {
 }
 
-func (c *mockConsumer) Nack(msg pulsar.Message) {}
+func (c *mockConsumer) Nack(_ pulsar.Message) {}
 
-func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
+func (c *mockConsumer) NackID(_ pulsar.MessageID) {}
 
 func (c *mockConsumer) Close() {}
 
-func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
+func (c *mockConsumer) Seek(_ pulsar.MessageID) error {
        return nil
 }
 
-func (c *mockConsumer) SeekByTime(time time.Time) error {
+func (c *mockConsumer) SeekByTime(_ time.Time) error {
        return nil
 }
 
diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go 
b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
index 3b6394ba..8fcfa17e 100644
--- a/pulsar/internal/pulsartracing/message_carrier_adaptors.go
+++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
@@ -38,14 +38,14 @@ func (a *ProducerMessageExtractAdapter) ForeachKey(handler 
func(key, val string)
        return nil
 }
 
-func (a *ProducerMessageExtractAdapter) Set(key, val string) {}
+func (a *ProducerMessageExtractAdapter) Set(_, _ string) {}
 
 // ProducerMessageInjectAdapter Implements TextMap Interface
 type ProducerMessageInjectAdapter struct {
        message *pulsar.ProducerMessage
 }
 
-func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val 
string) error) error {
+func (a *ProducerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error) 
error {
        return errors.New("iterator should never be used with Tracer.inject()")
 }
 
@@ -68,14 +68,14 @@ func (a *ConsumerMessageExtractAdapter) ForeachKey(handler 
func(key, val string)
        return nil
 }
 
-func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}
+func (a *ConsumerMessageExtractAdapter) Set(_, _ string) {}
 
 // ConsumerMessageInjectAdapter Implements TextMap Interface
 type ConsumerMessageInjectAdapter struct {
        message pulsar.ConsumerMessage
 }
 
-func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val 
string) error) error {
+func (a *ConsumerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error) 
error {
        return errors.New("iterator should never be used with tracer.inject()")
 }
 
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index df78ae2f..90658c1a 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -112,7 +112,7 @@ func (msg *mockConsumerMessage) GetReplicatedFrom() string {
        return ""
 }
 
-func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error {
+func (msg *mockConsumerMessage) GetSchemaValue(_ interface{}) error {
        return nil
 }
 
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go 
b/pulsar/internal/pulsartracing/producer_interceptor.go
index 6c7728cf..77cb8bf9 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -33,9 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer 
pulsar.Producer, message *puls
        buildAndInjectSpan(message, producer).Finish()
 }
 
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
-       message *pulsar.ProducerMessage,
-       msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(_ pulsar.Producer,
+       _ *pulsar.ProducerMessage,
+       _ pulsar.MessageID) {
 }
 
 func buildAndInjectSpan(message *pulsar.ProducerMessage, producer 
pulsar.Producer) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go 
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index 1c2c712f..74890f73 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -67,7 +67,7 @@ func (p *mockProducer) Flush() error {
        return nil
 }
 
-func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
+func (p *mockProducer) FlushWithCtx(_ context.Context) error {
        return nil
 }
 
diff --git a/pulsar/log/log.go b/pulsar/log/log.go
index 7ed52317..d68c687d 100644
--- a/pulsar/log/log.go
+++ b/pulsar/log/log.go
@@ -24,29 +24,29 @@ func DefaultNopLogger() Logger {
 
 type nopLogger struct{}
 
-func (l nopLogger) SubLogger(fields Fields) Logger                 { return l }
-func (l nopLogger) WithFields(fields Fields) Entry                 { return 
nopEntry{} }
-func (l nopLogger) WithField(name string, value interface{}) Entry { return 
nopEntry{} }
-func (l nopLogger) WithError(err error) Entry                      { return 
nopEntry{} }
-func (l nopLogger) Debug(args ...interface{})                      {}
-func (l nopLogger) Info(args ...interface{})                       {}
-func (l nopLogger) Warn(args ...interface{})                       {}
-func (l nopLogger) Error(args ...interface{})                      {}
-func (l nopLogger) Debugf(format string, args ...interface{})      {}
-func (l nopLogger) Infof(format string, args ...interface{})       {}
-func (l nopLogger) Warnf(format string, args ...interface{})       {}
-func (l nopLogger) Errorf(format string, args ...interface{})      {}
+func (l nopLogger) SubLogger(_ Fields) Logger               { return l }
+func (l nopLogger) WithFields(_ Fields) Entry               { return 
nopEntry{} }
+func (l nopLogger) WithField(_ string, _ interface{}) Entry { return 
nopEntry{} }
+func (l nopLogger) WithError(_ error) Entry                 { return 
nopEntry{} }
+func (l nopLogger) Debug(_ ...interface{})                  {}
+func (l nopLogger) Info(_ ...interface{})                   {}
+func (l nopLogger) Warn(_ ...interface{})                   {}
+func (l nopLogger) Error(_ ...interface{})                  {}
+func (l nopLogger) Debugf(_ string, _ ...interface{})       {}
+func (l nopLogger) Infof(_ string, _ ...interface{})        {}
+func (l nopLogger) Warnf(_ string, _ ...interface{})        {}
+func (l nopLogger) Errorf(_ string, _ ...interface{})       {}
 
 type nopEntry struct{}
 
-func (e nopEntry) WithFields(fields Fields) Entry                 { return 
nopEntry{} }
-func (e nopEntry) WithField(name string, value interface{}) Entry { return 
nopEntry{} }
+func (e nopEntry) WithFields(_ Fields) Entry               { return nopEntry{} 
}
+func (e nopEntry) WithField(_ string, _ interface{}) Entry { return nopEntry{} 
}
 
-func (e nopEntry) Debug(args ...interface{})                 {}
-func (e nopEntry) Info(args ...interface{})                  {}
-func (e nopEntry) Warn(args ...interface{})                  {}
-func (e nopEntry) Error(args ...interface{})                 {}
-func (e nopEntry) Debugf(format string, args ...interface{}) {}
-func (e nopEntry) Infof(format string, args ...interface{})  {}
-func (e nopEntry) Warnf(format string, args ...interface{})  {}
-func (e nopEntry) Errorf(format string, args ...interface{}) {}
+func (e nopEntry) Debug(_ ...interface{})            {}
+func (e nopEntry) Info(_ ...interface{})             {}
+func (e nopEntry) Warn(_ ...interface{})             {}
+func (e nopEntry) Error(_ ...interface{})            {}
+func (e nopEntry) Debugf(_ string, _ ...interface{}) {}
+func (e nopEntry) Infof(_ string, _ ...interface{})  {}
+func (e nopEntry) Warnf(_ string, _ ...interface{})  {}
+func (e nopEntry) Errorf(_ string, _ ...interface{}) {}
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 58f56767..1331c7df 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -25,7 +25,7 @@ import (
 )
 
 type redeliveryConsumer interface {
-       Redeliver(msgIds []messageID)
+       Redeliver(msgIDs []messageID)
 }
 
 type negativeAcksTracker struct {
@@ -123,7 +123,7 @@ func (t *negativeAcksTracker) track() {
                case <-t.tick.C:
                        {
                                now := time.Now()
-                               msgIds := make([]messageID, 0)
+                               msgIDs := make([]messageID, 0)
 
                                t.Lock()
 
@@ -131,15 +131,15 @@ func (t *negativeAcksTracker) track() {
                                        t.log.Debugf("MsgId: %v -- targetTime: 
%v -- now: %v", msgID, targetTime, now)
                                        if targetTime.Before(now) {
                                                t.log.Debugf("Adding MsgId: 
%v", msgID)
-                                               msgIds = append(msgIds, msgID)
+                                               msgIDs = append(msgIDs, msgID)
                                                delete(t.negativeAcks, msgID)
                                        }
                                }
 
                                t.Unlock()
 
-                               if len(msgIds) > 0 {
-                                       t.rc.Redeliver(msgIds)
+                               if len(msgIDs) > 0 {
+                                       t.rc.Redeliver(msgIDs)
                                }
                        }
                }
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 94934123..3f03ac44 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -56,22 +56,22 @@ func newNackMockedConsumer(nackBackoffPolicy 
NackBackoffPolicy) *nackMockedConsu
        return t
 }
 
-func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+func (nmc *nackMockedConsumer) Redeliver(msgIDs []messageID) {
        nmc.lock.Lock()
        defer nmc.lock.Unlock()
        if nmc.closed {
                return
        }
-       for _, id := range msgIds {
+       for _, id := range msgIDs {
                nmc.ch <- id
        }
 }
 
-func sortMessageIds(msgIds []messageID) []messageID {
-       sort.Slice(msgIds, func(i, j int) bool {
-               return msgIds[i].ledgerID < msgIds[j].entryID
+func sortMessageIDs(msgIDs []messageID) []messageID {
+       sort.Slice(msgIDs, func(i, j int) bool {
+               return msgIDs[i].ledgerID < msgIDs[j].entryID
        })
-       return msgIds
+       return msgIDs
 }
 
 func (nmc *nackMockedConsumer) Wait() <-chan messageID {
@@ -94,17 +94,17 @@ func TestNacksTracker(t *testing.T) {
                batchIdx: 1,
        })
 
-       msgIds := make([]messageID, 0)
+       msgIDs := make([]messageID, 0)
        for id := range nmc.Wait() {
-               msgIds = append(msgIds, id)
+               msgIDs = append(msgIDs, id)
        }
-       msgIds = sortMessageIds(msgIds)
+       msgIDs = sortMessageIDs(msgIDs)
 
-       assert.Equal(t, 2, len(msgIds))
-       assert.Equal(t, int64(1), msgIds[0].ledgerID)
-       assert.Equal(t, int64(1), msgIds[0].entryID)
-       assert.Equal(t, int64(2), msgIds[1].ledgerID)
-       assert.Equal(t, int64(2), msgIds[1].entryID)
+       assert.Equal(t, 2, len(msgIDs))
+       assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+       assert.Equal(t, int64(1), msgIDs[0].entryID)
+       assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+       assert.Equal(t, int64(2), msgIDs[1].entryID)
 
        nacks.Close()
        // allow multiple Close without panicing
@@ -139,17 +139,17 @@ func TestNacksWithBatchesTracker(t *testing.T) {
                batchIdx: 1,
        })
 
-       msgIds := make([]messageID, 0)
+       msgIDs := make([]messageID, 0)
        for id := range nmc.Wait() {
-               msgIds = append(msgIds, id)
+               msgIDs = append(msgIDs, id)
        }
-       msgIds = sortMessageIds(msgIds)
+       msgIDs = sortMessageIDs(msgIDs)
 
-       assert.Equal(t, 2, len(msgIds))
-       assert.Equal(t, int64(1), msgIds[0].ledgerID)
-       assert.Equal(t, int64(1), msgIds[0].entryID)
-       assert.Equal(t, int64(2), msgIds[1].ledgerID)
-       assert.Equal(t, int64(2), msgIds[1].entryID)
+       assert.Equal(t, 2, len(msgIDs))
+       assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+       assert.Equal(t, int64(1), msgIDs[0].entryID)
+       assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+       assert.Equal(t, int64(2), msgIDs[1].entryID)
 
        nacks.Close()
 }
@@ -161,17 +161,17 @@ func TestNackBackoffTracker(t *testing.T) {
        nacks.AddMessage(new(mockMessage1))
        nacks.AddMessage(new(mockMessage2))
 
-       msgIds := make([]messageID, 0)
+       msgIDs := make([]messageID, 0)
        for id := range nmc.Wait() {
-               msgIds = append(msgIds, id)
+               msgIDs = append(msgIDs, id)
        }
-       msgIds = sortMessageIds(msgIds)
+       msgIDs = sortMessageIDs(msgIDs)
 
-       assert.Equal(t, 2, len(msgIds))
-       assert.Equal(t, int64(1), msgIds[0].ledgerID)
-       assert.Equal(t, int64(1), msgIds[0].entryID)
-       assert.Equal(t, int64(2), msgIds[1].ledgerID)
-       assert.Equal(t, int64(2), msgIds[1].entryID)
+       assert.Equal(t, 2, len(msgIDs))
+       assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+       assert.Equal(t, int64(1), msgIDs[0].entryID)
+       assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+       assert.Equal(t, int64(2), msgIDs[1].entryID)
 
        nacks.Close()
        // allow multiple Close without panicing
@@ -230,7 +230,7 @@ func (msg *mockMessage1) GetReplicatedFrom() string {
        return ""
 }
 
-func (msg *mockMessage1) GetSchemaValue(v interface{}) error {
+func (msg *mockMessage1) GetSchemaValue(_ interface{}) error {
        return nil
 }
 
@@ -306,7 +306,7 @@ func (msg *mockMessage2) GetReplicatedFrom() string {
        return ""
 }
 
-func (msg *mockMessage2) GetSchemaValue(v interface{}) error {
+func (msg *mockMessage2) GetSchemaValue(_ interface{}) error {
        return nil
 }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f578ee4b..b1fc3f02 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1093,7 +1093,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg 
*ProducerMessage) (Mes
        isDone := uAtomic.NewBool(false)
        doneCh := make(chan struct{})
 
-       p.internalSendAsync(ctx, msg, func(ID MessageID, message 
*ProducerMessage, e error) {
+       p.internalSendAsync(ctx, msg, func(ID MessageID, _ *ProducerMessage, e 
error) {
                if isDone.CAS(false, true) {
                        err = e
                        msgID = ID
@@ -1375,59 +1375,58 @@ func (p *partitionProducer) 
ReceivedSendReceipt(response *pb.CommandSendReceipt)
                p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, local > remote, ignore it",
                        response.GetMessageId(), response.GetSequenceId(), 
pi.sequenceID)
                return
-       } else {
-               // The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback
-               p.pendingQueue.Poll()
+       }
+       // The ack was indeed for the expected item in the queue, we can remove 
it and trigger the callback
+       p.pendingQueue.Poll()
 
-               now := time.Now().UnixNano()
+       now := time.Now().UnixNano()
 
-               // lock the pending item while sending the requests
-               pi.Lock()
-               defer pi.Unlock()
-               
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
-               batchSize := int32(len(pi.sendRequests))
-               for idx, i := range pi.sendRequests {
-                       sr := i.(*sendRequest)
-                       atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-
-                       msgID := newMessageID(
-                               int64(response.MessageId.GetLedgerId()),
-                               int64(response.MessageId.GetEntryId()),
-                               int32(idx),
-                               p.partitionIdx,
-                               batchSize,
-                       )
-
-                       if sr.totalChunks > 1 {
-                               if sr.chunkID == 0 {
-                                       sr.chunkRecorder.setFirstChunkID(
-                                               &messageID{
-                                                       
int64(response.MessageId.GetLedgerId()),
-                                                       
int64(response.MessageId.GetEntryId()),
-                                                       -1,
-                                                       p.partitionIdx,
-                                                       0,
-                                               })
-                               } else if sr.chunkID == sr.totalChunks-1 {
-                                       sr.chunkRecorder.setLastChunkID(
-                                               &messageID{
-                                                       
int64(response.MessageId.GetLedgerId()),
-                                                       
int64(response.MessageId.GetEntryId()),
-                                                       -1,
-                                                       p.partitionIdx,
-                                                       0,
-                                               })
-                                       // use chunkMsgID to set msgID
-                                       msgID = &sr.chunkRecorder.chunkedMsgID
-                               }
+       // lock the pending item while sending the requests
+       pi.Lock()
+       defer pi.Unlock()
+       p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 
1.0e9)
+       batchSize := int32(len(pi.sendRequests))
+       for idx, i := range pi.sendRequests {
+               sr := i.(*sendRequest)
+               atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
+
+               msgID := newMessageID(
+                       int64(response.MessageId.GetLedgerId()),
+                       int64(response.MessageId.GetEntryId()),
+                       int32(idx),
+                       p.partitionIdx,
+                       batchSize,
+               )
+
+               if sr.totalChunks > 1 {
+                       if sr.chunkID == 0 {
+                               sr.chunkRecorder.setFirstChunkID(
+                                       &messageID{
+                                               
int64(response.MessageId.GetLedgerId()),
+                                               
int64(response.MessageId.GetEntryId()),
+                                               -1,
+                                               p.partitionIdx,
+                                               0,
+                                       })
+                       } else if sr.chunkID == sr.totalChunks-1 {
+                               sr.chunkRecorder.setLastChunkID(
+                                       &messageID{
+                                               
int64(response.MessageId.GetLedgerId()),
+                                               
int64(response.MessageId.GetEntryId()),
+                                               -1,
+                                               p.partitionIdx,
+                                               0,
+                                       })
+                               // use chunkMsgID to set msgID
+                               msgID = &sr.chunkRecorder.chunkedMsgID
                        }
-
-                       sr.done(msgID, nil)
                }
 
-               // Mark this pending item as done
-               pi.done(nil)
+               sr.done(msgID, nil)
        }
+
+       // Mark this pending item as done
+       pi.done(nil)
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index afd1f09a..b58c0608 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -153,7 +153,7 @@ func TestProducerAsyncSend(t *testing.T) {
        for i := 0; i < 10; i++ {
                producer.SendAsync(context.Background(), &ProducerMessage{
                        Payload: []byte("hello"),
-               }, func(id MessageID, message *ProducerMessage, e error) {
+               }, func(id MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                log.WithError(e).Error("Failed to publish")
                                errors.Put(e)
@@ -174,7 +174,7 @@ func TestProducerAsyncSend(t *testing.T) {
        assert.Equal(t, 0, errors.Size())
 
        wg.Add(1)
-       producer.SendAsync(context.Background(), nil, func(id MessageID, m 
*ProducerMessage, e error) {
+       producer.SendAsync(context.Background(), nil, func(id MessageID, _ 
*ProducerMessage, e error) {
                assert.NotNil(t, e)
                assert.Nil(t, id)
                wg.Done()
@@ -183,7 +183,7 @@ func TestProducerAsyncSend(t *testing.T) {
 
        wg.Add(1)
        producer.SendAsync(context.Background(), &ProducerMessage{Payload: 
[]byte("hello"), Value: []byte("hello")},
-               func(id MessageID, m *ProducerMessage, e error) {
+               func(id MessageID, _ *ProducerMessage, e error) {
                        assert.NotNil(t, e)
                        assert.Nil(t, id)
                        wg.Done()
@@ -214,7 +214,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
        for i := 0; i < 10; i++ {
                producer.SendAsync(context.Background(), &ProducerMessage{
                        Payload: []byte("hello"),
-               }, func(id MessageID, message *ProducerMessage, e error) {
+               }, func(id MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                log.WithError(e).Error("Failed to publish")
                                errors.Put(e)
@@ -383,7 +383,7 @@ func TestFlushInProducer(t *testing.T) {
                messageContent := prefix + fmt.Sprintf("%d", i)
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(messageContent),
-               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+               }, func(id MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                log.WithError(e).Error("Failed to publish")
                                errors.Put(e)
@@ -424,7 +424,7 @@ func TestFlushInProducer(t *testing.T) {
                messageContent := prefix + fmt.Sprintf("%d", i)
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(messageContent),
-               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+               }, func(id MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                log.WithError(e).Error("Failed to publish")
                                errors.Put(e)
@@ -494,7 +494,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
                messageContent := prefix + fmt.Sprintf("%d", i)
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(messageContent),
-               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+               }, func(id MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                log.WithError(e).Error("Failed to publish")
                                errors.Put(e)
@@ -844,7 +844,7 @@ func TestBatchMessageFlushing(t *testing.T) {
                msg := &ProducerMessage{
                        Payload: msg,
                }
-               producer.SendAsync(ctx, msg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+               producer.SendAsync(ctx, msg, func(_ MessageID, _ 
*ProducerMessage, _ error) {
                        ch <- struct{}{}
                })
        }
@@ -898,7 +898,7 @@ func TestBatchDelayMessage(t *testing.T) {
        }
        var delayMsgID int64
        ch := make(chan struct{}, 2)
-       producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+       producer.SendAsync(ctx, delayMsg, func(id MessageID, _ 
*ProducerMessage, _ error) {
                atomic.StoreInt64(&delayMsgID, id.(*messageID).entryID)
                ch <- struct{}{}
        })
@@ -914,7 +914,7 @@ func TestBatchDelayMessage(t *testing.T) {
                Payload: []byte("no delay"),
        }
        var noDelayMsgID int64
-       producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+       producer.SendAsync(ctx, noDelayMsg, func(id MessageID, _ 
*ProducerMessage, _ error) {
                atomic.StoreInt64(&noDelayMsgID, id.(*messageID).entryID)
        })
        for i := 0; i < 2; i++ {
@@ -1174,7 +1174,7 @@ func TestFailedSchemaEncode(t *testing.T) {
        // producer should send return an error as message is Int64, but schema 
is String
        producer.SendAsync(ctx, &ProducerMessage{
                Value: int64(1),
-       }, func(messageID MessageID, producerMessage *ProducerMessage, err 
error) {
+       }, func(messageID MessageID, _ *ProducerMessage, err error) {
                assert.NotNil(t, err)
                assert.Nil(t, messageID)
                wg.Done()
@@ -1503,9 +1503,9 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) {
 
 type noopProduceInterceptor struct{}
 
-func (noopProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {}
+func (noopProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) {}
 
-func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message 
*ProducerMessage, msgID MessageID) {
+func (noopProduceInterceptor) OnSendAcknowledgement(_ Producer, _ 
*ProducerMessage, _ MessageID) {
 }
 
 // copyPropertyIntercepotr copy all keys in message properties map and add a 
suffix
@@ -1514,11 +1514,11 @@ type metricProduceInterceptor struct {
        ackn  int
 }
 
-func (x *metricProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {
+func (x *metricProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) {
        x.sendn++
 }
 
-func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, 
message *ProducerMessage, msgID MessageID) {
+func (x *metricProduceInterceptor) OnSendAcknowledgement(_ Producer, _ 
*ProducerMessage, _ MessageID) {
        x.ackn++
 }
 
@@ -1734,7 +1734,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t 
*testing.T) {
                                Payload: messageContent,
                                Key:     key,
                                Schema:  schema,
-                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       }, func(id MessageID, _ *ProducerMessage, err error) {
                                assert.NoError(t, err)
                                assert.NotNil(t, id)
                        })
@@ -1827,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
                        Payload: messageContent,
                        Key:     key,
                        Schema:  schema,
-               }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
+               }, func(id MessageID, _ *ProducerMessage, err error) {
                        assert.NoError(t, err)
                        assert.NotNil(t, id)
                })
@@ -2025,16 +2025,16 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
        for i := 0; i < n/2; i++ {
                producer1.SendAsync(context.Background(), &ProducerMessage{
                        Payload: make([]byte, 1024),
-               }, func(id MessageID, message *ProducerMessage, e error) {})
+               }, func(_ MessageID, _ *ProducerMessage, _ error) {})
 
                producer2.SendAsync(context.Background(), &ProducerMessage{
                        Payload: make([]byte, 1024),
-               }, func(id MessageID, message *ProducerMessage, e error) {})
+               }, func(_ MessageID, _ *ProducerMessage, _ error) {})
        }
        // Last message in order to reach the limit
        producer1.SendAsync(context.Background(), &ProducerMessage{
                Payload: make([]byte, 1024),
-       }, func(id MessageID, message *ProducerMessage, e error) {})
+       }, func(_ MessageID, _ *ProducerMessage, _ error) {})
        time.Sleep(100 * time.Millisecond)
        assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage())
 
@@ -2112,18 +2112,18 @@ func TestMemLimitRejectProducerMessagesWithSchema(t 
*testing.T) {
                producer1.SendAsync(context.Background(), &ProducerMessage{
                        Value:  value,
                        Schema: schema,
-               }, func(id MessageID, message *ProducerMessage, e error) {})
+               }, func(_ MessageID, _ *ProducerMessage, _ error) {})
 
                producer2.SendAsync(context.Background(), &ProducerMessage{
                        Value:  value,
                        Schema: schema,
-               }, func(id MessageID, message *ProducerMessage, e error) {})
+               }, func(_ MessageID, _ *ProducerMessage, _ error) {})
        }
        // Last message in order to reach the limit
        producer1.SendAsync(context.Background(), &ProducerMessage{
                Value:  value,
                Schema: schema,
-       }, func(id MessageID, message *ProducerMessage, e error) {})
+       }, func(_ MessageID, _ *ProducerMessage, _ error) {})
        time.Sleep(100 * time.Millisecond)
        assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
 
@@ -2187,7 +2187,7 @@ func TestMemLimitRejectProducerMessagesWithChunking(t 
*testing.T) {
 
        producer2.SendAsync(context.Background(), &ProducerMessage{
                Payload: make([]byte, 5*1024+1),
-       }, func(id MessageID, message *ProducerMessage, e error) {
+       }, func(_ MessageID, _ *ProducerMessage, e error) {
                if e != nil {
                        t.Fatal(e)
                }
@@ -2247,7 +2247,7 @@ func TestMemLimitContextCancel(t *testing.T) {
        for i := 0; i < n; i++ {
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: make([]byte, 1024),
-               }, func(id MessageID, message *ProducerMessage, e error) {})
+               }, func(_ MessageID, _ *ProducerMessage, _ error) {})
        }
        time.Sleep(100 * time.Millisecond)
        assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage())
@@ -2257,7 +2257,7 @@ func TestMemLimitContextCancel(t *testing.T) {
        go func() {
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: make([]byte, 1024),
-               }, func(id MessageID, message *ProducerMessage, e error) {
+               }, func(_ MessageID, _ *ProducerMessage, e error) {
                        assert.Error(t, e)
                        assert.ErrorContains(t, e, getResultStr(TimeoutError))
                        wg.Done()
@@ -2369,7 +2369,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
        for i := 0; i < 3; i++ {
                testProducer.SendAsync(context.Background(), &ProducerMessage{
                        Payload: make([]byte, 1024),
-               }, func(id MessageID, message *ProducerMessage, e error) {
+               }, func(_ MessageID, _ *ProducerMessage, e error) {
                        if e != nil {
                                assert.True(t, errors.Is(e, ErrProducerClosed))
                        }
@@ -2517,7 +2517,7 @@ func TestProducerWithMaxConnectionsPerBroker(t 
*testing.T) {
 
                var ok int32
                testProducer.SendAsync(context.Background(), 
&ProducerMessage{Value: []byte("hello")},
-                       func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                       func(_ MessageID, _ *ProducerMessage, err error) {
                                if err == nil {
                                        atomic.StoreInt32(&ok, 1)
                                }
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 3c928c1d..d2fa5597 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -320,7 +320,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
 
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
+               }, func(id MessageID, _ *ProducerMessage, err error) {
                        assert.NoError(t, err)
                        assert.NotNil(t, id)
                        msgIDs[idx] = id
@@ -396,7 +396,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
 
                producer.SendAsync(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               }, func(id MessageID, producerMessage *ProducerMessage, err 
error) {
+               }, func(id MessageID, _ *ProducerMessage, err error) {
                        assert.NoError(t, err)
                        assert.NotNil(t, id)
                        msgIDs[idx] = id
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index c8aa0b94..a9a67adb 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -98,8 +98,8 @@ func (r *retryRouter) run() {
                        producer := r.getProducer()
 
                        msgID := rm.consumerMsg.ID()
-                       producer.SendAsync(context.Background(), 
&rm.producerMsg, func(messageID MessageID,
-                               producerMessage *ProducerMessage, err error) {
+                       producer.SendAsync(context.Background(), 
&rm.producerMsg, func(_ MessageID,
+                               _ *ProducerMessage, err error) {
                                if err != nil {
                                        r.log.WithError(err).WithField("msgID", 
msgID).Error("Failed to send message to RLQ")
                                        
rm.consumerMsg.Consumer.Nack(rm.consumerMsg)
@@ -150,9 +150,8 @@ func (r *retryRouter) getProducer() Producer {
                        r.log.WithError(err).Error("Failed to create RLQ 
producer")
                        time.Sleep(bo.Next())
                        continue
-               } else {
-                       r.producer = producer
-                       return producer
                }
+               r.producer = producer
+               return producer
        }
 }
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index 2368e3d8..f9f8aec1 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -55,7 +55,7 @@ func TestTableView(t *testing.T) {
                t.Log(key)
                _, err = producer.Send(context.Background(), &ProducerMessage{
                        Key:   key,
-                       Value: fmt.Sprintf(valuePrefix + key),
+                       Value: valuePrefix + key,
                })
                assert.NoError(t, err)
        }
@@ -329,7 +329,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) {
                t.Log("foreach" + key)
                s, ok := value.(testJSON)
                assert.Truef(t, ok, "expected value to be testJSON type got 
%T", value)
-               assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
+               assert.Equal(t, valuePrefix+key, s.Name)
                return nil
        })
 
@@ -349,7 +349,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) {
                        Key: key,
                        Value: testJSON{
                                ID:   i,
-                               Name: fmt.Sprintf(valuePrefix + key),
+                               Name: valuePrefix + key,
                        },
                })
                assert.NoError(t, err)
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
index 2eb8aca9..82e6fcfa 100644
--- a/pulsar/transaction_impl.go
+++ b/pulsar/transaction_impl.go
@@ -84,7 +84,7 @@ func (txn *transaction) GetState() TxnState {
        return txn.state
 }
 
-func (txn *transaction) Commit(ctx context.Context) error {
+func (txn *transaction) Commit(_ context.Context) error {
        if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnCommitting)) ||
                txn.state == TxnCommitting) {
                return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
@@ -110,7 +110,7 @@ func (txn *transaction) Commit(ctx context.Context) error {
        return err
 }
 
-func (txn *transaction) Abort(ctx context.Context) error {
+func (txn *transaction) Abort(_ context.Context) error {
        if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), 
int32(TxnAborting)) ||
                txn.state == TxnAborting) {
                return newError(InvalidStatus, "Expect transaction state is 
TxnOpen but "+txn.state.string())
diff --git a/pulsaradmin/pkg/admin/admin_test.go 
b/pulsaradmin/pkg/admin/admin_test.go
index c4fa5295..f6cd3263 100644
--- a/pulsaradmin/pkg/admin/admin_test.go
+++ b/pulsaradmin/pkg/admin/admin_test.go
@@ -61,7 +61,7 @@ type customAuthProvider struct {
 
 var _ auth.Provider = &customAuthProvider{}
 
-func (c *customAuthProvider) RoundTrip(req *http.Request) (*http.Response, 
error) {
+func (c *customAuthProvider) RoundTrip(_ *http.Request) (*http.Response, 
error) {
        panic("implement me")
 }
 
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go 
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 4bd15461..59587abf 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -121,7 +121,7 @@ func NewAuthenticationOAuth2WithParams(
        issuerEndpoint,
        clientID,
        audience string,
-       scope string,
+       _ string,
        transport http.RoundTripper) (*OAuth2Provider, error) {
 
        issuer := oauth2.Issuer{
@@ -257,6 +257,6 @@ func makeKeyring() (keyring.Keyring, error) {
        })
 }
 
-func keyringPrompt(prompt string) (string, error) {
+func keyringPrompt(_ string) (string, error) {
        return "", nil
 }
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go 
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index b25e5761..b19133c7 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -37,7 +37,7 @@ func mockOAuthServer() *httptest.Server {
 
        // mock the used REST path for the tests
        mockedHandler := http.NewServeMux()
-       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/.well-known/openid-configuration", 
func(writer http.ResponseWriter, _ *http.Request) {
                s := fmt.Sprintf(`{
     "issuer":"%s",
     "authorization_endpoint":"%s/authorize",
@@ -46,10 +46,10 @@ func mockOAuthServer() *httptest.Server {
 }`, server.URL, server.URL, server.URL, server.URL)
                fmt.Fprintln(writer, s)
        })
-       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, request *http.Request) {
+       mockedHandler.HandleFunc("/oauth/token", func(writer 
http.ResponseWriter, _ *http.Request) {
                fmt.Fprintln(writer, "{\n  \"access_token\": 
\"token-content\",\n  \"token_type\": \"Bearer\"\n}")
        })
-       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
request *http.Request) {
+       mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, 
_ *http.Request) {
                fmt.Fprintln(writer, "true")
        })
 
diff --git a/pulsaradmin/pkg/admin/namespace.go 
b/pulsaradmin/pkg/admin/namespace.go
index 9c3f3e3f..e92f020c 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -148,7 +148,7 @@ type Namespaces interface {
        GetNamespaceReplicationClusters(namespace string) ([]string, error)
 
        // SetNamespaceReplicationClusters returns the replication clusters for 
a namespace
-       SetNamespaceReplicationClusters(namespace string, clusterIds []string) 
error
+       SetNamespaceReplicationClusters(namespace string, clusterIDs []string) 
error
 
        // SetNamespaceAntiAffinityGroup sets anti-affinity group name for a 
namespace
        SetNamespaceAntiAffinityGroup(namespace string, 
namespaceAntiAffinityGroup string) error
@@ -616,13 +616,13 @@ func (n *namespaces) 
GetNamespaceReplicationClusters(namespace string) ([]string
        return data, err
 }
 
-func (n *namespaces) SetNamespaceReplicationClusters(namespace string, 
clusterIds []string) error {
+func (n *namespaces) SetNamespaceReplicationClusters(namespace string, 
clusterIDs []string) error {
        nsName, err := utils.GetNamespaceName(namespace)
        if err != nil {
                return err
        }
        endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), 
"replication")
-       return n.pulsar.Client.Post(endpoint, &clusterIds)
+       return n.pulsar.Client.Post(endpoint, &clusterIDs)
 }
 
 func (n *namespaces) SetNamespaceAntiAffinityGroup(namespace string, 
namespaceAntiAffinityGroup string) error {
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index 98db6961..087a4a37 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -67,7 +67,7 @@ func TestGetMessagesByID(t *testing.T) {
        for i := 0; i <= numberMessages; i++ {
                producer.SendAsync(ctx, &pulsar.ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, 
err error) {
+               }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err 
error) {
                        assert.Nil(t, err)
                        messageIDMap[id.String()]++
                        wg.Done()
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 61607912..c9664e71 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -215,7 +215,7 @@ type NamespacesData struct {
        MessageTTL                     int      `json:"messageTTL"`
        BookkeeperAckQuorum            int      `json:"bookkeeperAckQuorum"`
        ManagedLedgerMaxMarkDeleteRate float64  
`json:"managedLedgerMaxMarkDeleteRate"`
-       ClusterIds                     string   `json:"clusterIds"`
+       ClusterIDs                     string   `json:"clusterIds"`
        RetentionTimeStr               string   `json:"retentionTimeStr"`
        LimitStr                       string   `json:"limitStr"`
        LimitTime                      int64    `json:"limitTime"`
diff --git a/pulsaradmin/pkg/utils/schema_util.go 
b/pulsaradmin/pkg/utils/schema_util.go
index 671627fa..b8a17d96 100644
--- a/pulsaradmin/pkg/utils/schema_util.go
+++ b/pulsaradmin/pkg/utils/schema_util.go
@@ -56,11 +56,9 @@ type IsCompatibility struct {
 func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response 
GetSchemaResponse) *SchemaInfo {
        info := new(SchemaInfo)
        schema := make([]byte, 0, 10)
-       if response.Type == "KEY_VALUE" {
-               // TODO: impl logic
-       } else {
+       if response.Type != "KEY_VALUE" {
                schema = []byte(response.Data)
-       }
+       } // TODO: impl logic for KEY_VALUE
 
        info.Schema = schema
        info.Type = response.Type

Reply via email to