This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 310fb94d Bump the minimum Go version to 1.22 (#1300)
310fb94d is described below
commit 310fb94d29624e380b88968f3c2d030f12a1c4af
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Oct 26 09:34:35 2024 +0800
Bump the minimum Go version to 1.22 (#1300)
---
.github/workflows/ci.yml | 8 +-
Dockerfile | 2 +-
Makefile | 2 +-
go.mod | 2 +-
oauth2/auth_suite_test.go | 10 +-
oauth2/authorization_tokenretriever_test.go | 170 ++++++++++-----------
oauth2/client_credentials_flow_test.go | 55 ++++---
oauth2/clock/testing/fake_clock.go | 8 +-
oauth2/config_tokenprovider_test.go | 28 ++--
oauth2/device_code_flow_test.go | 70 ++++-----
oauth2/oidc_endpoint_provider_test.go | 42 ++---
perf/perf-consumer.go | 2 +-
perf/perf-producer.go | 4 +-
perf/pulsar-perf-go.go | 2 +-
pulsar/ack_grouping_tracker.go | 2 +-
pulsar/ack_grouping_tracker_test.go | 4 +-
pulsar/auth/athenz_test.go | 8 +-
pulsar/auth/disabled.go | 4 +-
pulsar/auth/oauth2_test.go | 6 +-
pulsar/auth/token.go | 3 +-
pulsar/client_impl_test.go | 6 +-
pulsar/consumer_impl.go | 3 +-
pulsar/consumer_multitopic.go | 4 +-
pulsar/consumer_partition.go | 31 ++--
pulsar/consumer_test.go | 70 ++++-----
pulsar/crypto/default_message_crypto.go | 2 +-
pulsar/dlq_router.go | 7 +-
pulsar/helper_for_test.go | 4 +-
pulsar/internal/channel_cond_test.go | 4 +-
pulsar/internal/compression/noop.go | 2 +-
pulsar/internal/compression/zstd_cgo.go | 2 +-
pulsar/internal/compression/zstd_go.go | 2 +-
pulsar/internal/crypto/consumer_decryptor.go | 2 +-
pulsar/internal/crypto/noop_decryptor.go | 2 +-
pulsar/internal/crypto/noop_encryptor.go | 2 +-
pulsar/internal/http_client.go | 4 +-
pulsar/internal/lookup_service.go | 2 +-
pulsar/internal/lookup_service_test.go | 36 ++---
.../internal/pulsartracing/consumer_interceptor.go | 4 +-
.../pulsartracing/consumer_interceptor_test.go | 26 ++--
.../pulsartracing/message_carrier_adaptors.go | 8 +-
.../pulsartracing/message_carrier_util_test.go | 2 +-
.../internal/pulsartracing/producer_interceptor.go | 6 +-
.../pulsartracing/producer_interceptor_test.go | 2 +-
pulsar/log/log.go | 44 +++---
pulsar/negative_acks_tracker.go | 10 +-
pulsar/negative_acks_tracker_test.go | 64 ++++----
pulsar/producer_partition.go | 95 ++++++------
pulsar/producer_test.go | 56 +++----
pulsar/reader_test.go | 4 +-
pulsar/retry_router.go | 9 +-
pulsar/table_view_test.go | 6 +-
pulsar/transaction_impl.go | 4 +-
pulsaradmin/pkg/admin/admin_test.go | 2 +-
pulsaradmin/pkg/admin/auth/oauth2.go | 4 +-
pulsaradmin/pkg/admin/auth/oauth2_test.go | 6 +-
pulsaradmin/pkg/admin/namespace.go | 6 +-
pulsaradmin/pkg/admin/subscription_test.go | 2 +-
pulsaradmin/pkg/utils/data.go | 2 +-
pulsaradmin/pkg/utils/schema_util.go | 6 +-
60 files changed, 488 insertions(+), 497 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c0b77c60..52781048 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- go-version: [ '1.21', '1.22' ]
+ go-version: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v5
@@ -36,19 +36,19 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v5
with:
- go-version: '1.20'
+ go-version: '1.22'
- name: Check license header
run: docker run --rm -v $(pwd):/github/workspace
ghcr.io/korandoru/hawkeye-native:v3 check
- name: Run golangci-lint
uses: golangci/golangci-lint-action@v6
with:
- version: v1.51.2
+ version: v1.61.0
integration-tests:
runs-on: ubuntu-latest
strategy:
matrix:
- go-version: [ '1.21', '1.22' ]
+ go-version: [ '1.22', '1.23' ]
steps:
- uses: actions/checkout@v3
- name: clean docker cache
diff --git a/Dockerfile b/Dockerfile
index fecfa98f..8895c076 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,7 +19,7 @@
# set via the Makefile or CLI
ARG PULSAR_IMAGE=apachepulsar/pulsar:latest
-ARG GO_VERSION=1.20
+ARG GO_VERSION=1.22
FROM golang:$GO_VERSION as golang
FROM $PULSAR_IMAGE
diff --git a/Makefile b/Makefile
index 23844c41..5c38303e 100644
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,7 @@
IMAGE_NAME = pulsar-client-go-test:latest
PULSAR_VERSION ?= 3.2.2
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
-GO_VERSION ?= 1.21
+GO_VERSION ?= 1.22
CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/)
# Golang standard bin directory.
diff --git a/go.mod b/go.mod
index 1a7025fd..c2f3457a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
module github.com/apache/pulsar-client-go
-go 1.21
+go 1.22
require (
github.com/99designs/keyring v1.2.1
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
index 95accff2..54b24299 100644
--- a/oauth2/auth_suite_test.go
+++ b/oauth2/auth_suite_test.go
@@ -21,13 +21,13 @@ import (
"context"
"testing"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
func TestAuth(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "cloud-cli Auth Suite")
+ gomega.RegisterFailHandler(ginkgo.Fail)
+ ginkgo.RunSpecs(t, "cloud-cli Auth Suite")
}
type MockTokenExchanger struct {
@@ -52,7 +52,7 @@ func (te *MockTokenExchanger) ExchangeClientCredentials(req
ClientCredentialsExc
return te.ReturnsTokens, te.ReturnsError
}
-func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context,
+func (te *MockTokenExchanger) ExchangeDeviceCode(_ context.Context,
req DeviceCodeExchangeRequest) (*TokenResult, error) {
te.CalledWithRequest = &req
return te.ReturnsTokens, te.ReturnsError
diff --git a/oauth2/authorization_tokenretriever_test.go
b/oauth2/authorization_tokenretriever_test.go
index 6ae55d8f..d57f9dc3 100644
--- a/oauth2/authorization_tokenretriever_test.go
+++ b/oauth2/authorization_tokenretriever_test.go
@@ -26,8 +26,8 @@ import (
"strings"
"time"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
type MockTransport struct {
@@ -37,7 +37,7 @@ type MockTransport struct {
var _ HTTPAuthTransport = &MockTransport{}
-func (t *MockTransport) Do(req *http.Request) (*http.Response, error) {
+func (t *MockTransport) Do(_ *http.Request) (*http.Response, error) {
if len(t.Responses) > 0 {
r := t.Responses[0]
t.Responses = t.Responses[1:]
@@ -46,9 +46,9 @@ func (t *MockTransport) Do(req *http.Request)
(*http.Response, error) {
return nil, t.ReturnError
}
-var _ = Describe("CodetokenExchanger", func() {
- Describe("newExchangeCodeRequest", func() {
- It("creates the request", func() {
+var _ = ginkgo.Describe("CodetokenExchanger", func() {
+ ginkgo.Describe("newExchangeCodeRequest", func() {
+ ginkgo.It("creates the request", func() {
tokenRetriever := TokenRetriever{}
exchangeRequest := AuthorizationCodeExchangeRequest{
TokenEndpoint: "https://issuer/oauth/token",
@@ -62,32 +62,32 @@ var _ = Describe("CodetokenExchanger", func() {
result.ParseForm()
- Expect(err).To(BeNil())
-
Expect(result.FormValue("grant_type")).To(Equal("authorization_code"))
-
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-
Expect(result.FormValue("code_verifier")).To(Equal("Verifier"))
- Expect(result.FormValue("code")).To(Equal("code"))
-
Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect"))
-
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+ gomega.Expect(err).To(gomega.BeNil())
+
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("authorization_code"))
+
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+
gomega.Expect(result.FormValue("code_verifier")).To(gomega.Equal("Verifier"))
+
gomega.Expect(result.FormValue("code")).To(gomega.Equal("code"))
+
gomega.Expect(result.FormValue("redirect_uri")).To(gomega.Equal("https://redirect"))
+
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token"))
-
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-
Expect(result.Header.Get("Content-Length")).To(Equal("117"))
+
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("117"))
})
- It("returns an error when NewRequest returns an error", func() {
+ ginkgo.It("returns an error when NewRequest returns an error",
func() {
tokenRetriever := TokenRetriever{}
result, err :=
tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{
TokenEndpoint: "://issuer/oauth/token",
})
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
+ gomega.Expect(result).To(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
})
})
- Describe("handleAuthTokensResponse", func() {
- It("handles the response", func() {
+ ginkgo.Describe("handleAuthTokensResponse", func() {
+ ginkgo.It("handles the response", func() {
tokenRetriever := TokenRetriever{}
response := buildResponse(200,
AuthorizationTokenResponse{
ExpiresIn: 1,
@@ -97,49 +97,49 @@ var _ = Describe("CodetokenExchanger", func() {
result, err :=
tokenRetriever.handleAuthTokensResponse(response)
- Expect(err).To(BeNil())
- Expect(result).To(Equal(&TokenResult{
+ gomega.Expect(err).To(gomega.BeNil())
+ gomega.Expect(result).To(gomega.Equal(&TokenResult{
ExpiresIn: 1,
AccessToken: "myAccessToken",
RefreshToken: "myRefreshToken",
}))
})
- It("returns error when status code is not successful", func() {
+ ginkgo.It("returns error when status code is not successful",
func() {
tokenRetriever := TokenRetriever{}
response := buildResponse(500, nil)
result, err :=
tokenRetriever.handleAuthTokensResponse(response)
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Not(BeNil()))
+ gomega.Expect(result).To(gomega.BeNil())
+
gomega.Expect(err.Error()).To(gomega.Not(gomega.BeNil()))
})
- It("returns typed error when response body contains error
information", func() {
+ ginkgo.It("returns typed error when response body contains
error information", func() {
errorBody := TokenErrorResponse{Error: "test",
ErrorDescription: "test description"}
tokenRetriever := TokenRetriever{}
response := buildResponse(400, errorBody)
result, err :=
tokenRetriever.handleAuthTokensResponse(response)
- Expect(result).To(BeNil())
- Expect(err).To(Equal(&TokenError{ErrorCode: "test",
ErrorDescription: "test description"}))
- Expect(err.Error()).To(Equal("test description (test)"))
+ gomega.Expect(result).To(gomega.BeNil())
+
gomega.Expect(err).To(gomega.Equal(&TokenError{ErrorCode: "test",
ErrorDescription: "test description"}))
+ gomega.Expect(err.Error()).To(gomega.Equal("test
description (test)"))
})
- It("returns error when deserialization fails", func() {
+ ginkgo.It("returns error when deserialization fails", func() {
tokenRetriever := TokenRetriever{}
response := buildResponse(200, "")
result, err :=
tokenRetriever.handleAuthTokensResponse(response)
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Equal(
+ gomega.Expect(result).To(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal(
"json: cannot unmarshal string into Go value of
type oauth2.AuthorizationTokenResponse"))
})
})
- Describe("newRefreshTokenRequest", func() {
- It("creates the request", func() {
+ ginkgo.Describe("newRefreshTokenRequest", func() {
+ ginkgo.It("creates the request", func() {
tokenRetriever := TokenRetriever{}
exchangeRequest := RefreshTokenExchangeRequest{
TokenEndpoint: "https://issuer/oauth/token",
@@ -151,30 +151,30 @@ var _ = Describe("CodetokenExchanger", func() {
result.ParseForm()
- Expect(err).To(BeNil())
-
Expect(result.FormValue("grant_type")).To(Equal("refresh_token"))
-
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-
Expect(result.FormValue("refresh_token")).To(Equal("refreshToken"))
-
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+ gomega.Expect(err).To(gomega.BeNil())
+
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("refresh_token"))
+
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+
gomega.Expect(result.FormValue("refresh_token")).To(gomega.Equal("refreshToken"))
+
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token"))
-
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-
Expect(result.Header.Get("Content-Length")).To(Equal("70"))
+
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("70"))
})
- It("returns an error when NewRequest returns an error", func() {
+ ginkgo.It("returns an error when NewRequest returns an error",
func() {
tokenRetriever := TokenRetriever{}
result, err :=
tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{
TokenEndpoint: "://issuer/oauth/token",
})
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
+ gomega.Expect(result).To(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
})
})
- Describe("newClientCredentialsRequest", func() {
- It("creates the request", func() {
+ ginkgo.Describe("newClientCredentialsRequest", func() {
+ ginkgo.It("creates the request", func() {
tokenRetriever := TokenRetriever{}
exchangeRequest := ClientCredentialsExchangeRequest{
TokenEndpoint: "https://issuer/oauth/token",
@@ -187,31 +187,31 @@ var _ = Describe("CodetokenExchanger", func() {
result.ParseForm()
- Expect(err).To(BeNil())
-
Expect(result.FormValue("grant_type")).To(Equal("client_credentials"))
-
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-
Expect(result.FormValue("client_secret")).To(Equal("clientSecret"))
-
Expect(result.FormValue("audience")).To(Equal("audience"))
-
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+ gomega.Expect(err).To(gomega.BeNil())
+
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("client_credentials"))
+
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+
gomega.Expect(result.FormValue("client_secret")).To(gomega.Equal("clientSecret"))
+
gomega.Expect(result.FormValue("audience")).To(gomega.Equal("audience"))
+
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token"))
-
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-
Expect(result.Header.Get("Content-Length")).To(Equal("93"))
+
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("93"))
})
- It("returns an error when NewRequest returns an error", func() {
+ ginkgo.It("returns an error when NewRequest returns an error",
func() {
tokenRetriever := TokenRetriever{}
result, err :=
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
TokenEndpoint: "://issuer/oauth/token",
})
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
+ gomega.Expect(result).To(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
})
})
- Describe("newDeviceCodeExchangeRequest", func() {
- It("creates the request", func() {
+ ginkgo.Describe("newDeviceCodeExchangeRequest", func() {
+ ginkgo.It("creates the request", func() {
tokenRetriever := TokenRetriever{}
exchangeRequest := DeviceCodeExchangeRequest{
TokenEndpoint: "https://issuer/oauth/token",
@@ -224,35 +224,35 @@ var _ = Describe("CodetokenExchanger", func() {
result.ParseForm()
- Expect(err).To(BeNil())
-
Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code"))
-
Expect(result.FormValue("client_id")).To(Equal("clientID"))
-
Expect(result.FormValue("device_code")).To(Equal("deviceCode"))
-
Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+ gomega.Expect(err).To(gomega.BeNil())
+
gomega.Expect(result.FormValue("grant_type")).To(gomega.Equal("urn:ietf:params:oauth:grant-type:device_code"))
+
gomega.Expect(result.FormValue("client_id")).To(gomega.Equal("clientID"))
+
gomega.Expect(result.FormValue("device_code")).To(gomega.Equal("deviceCode"))
+
gomega.Expect(result.URL.String()).To(gomega.Equal("https://issuer/oauth/token"))
-
Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
-
Expect(result.Header.Get("Content-Length")).To(Equal("107"))
+
gomega.Expect(result.Header.Get("Content-Type")).To(gomega.Equal("application/x-www-form-urlencoded"))
+
gomega.Expect(result.Header.Get("Content-Length")).To(gomega.Equal("107"))
})
- It("returns an error when NewRequest returns an error", func() {
+ ginkgo.It("returns an error when NewRequest returns an error",
func() {
tokenRetriever := TokenRetriever{}
result, err :=
tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
TokenEndpoint: "://issuer/oauth/token",
})
- Expect(result).To(BeNil())
- Expect(err.Error()).To(Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
+ gomega.Expect(result).To(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("parse
\"://issuer/oauth/token\": missing protocol scheme"))
})
})
- Describe("ExchangeDeviceCode", func() {
+ ginkgo.Describe("ExchangeDeviceCode", func() {
var mockTransport *MockTransport
var tokenRetriever *TokenRetriever
var exchangeRequest DeviceCodeExchangeRequest
var tokenResult TokenResult
- BeforeEach(func() {
+ ginkgo.BeforeEach(func() {
mockTransport = &MockTransport{}
tokenRetriever = &TokenRetriever{
transport: mockTransport,
@@ -270,21 +270,21 @@ var _ = Describe("CodetokenExchanger", func() {
}
})
- It("returns a token", func() {
+ ginkgo.It("returns a token", func() {
})
- It("supports cancellation", func() {
+ ginkgo.It("supports cancellation", func() {
mockTransport.Responses = []*http.Response{
buildResponse(400,
&TokenErrorResponse{"authorization_pending", ""}),
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := tokenRetriever.ExchangeDeviceCode(ctx,
exchangeRequest)
- Expect(err).ToNot(BeNil())
- Expect(err.Error()).To(Equal("cancelled"))
+ gomega.Expect(err).ToNot(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("cancelled"))
})
- It("implements authorization_pending and slow_down", func() {
+ ginkgo.It("implements authorization_pending and slow_down",
func() {
startTime := time.Now()
mockTransport.Responses = []*http.Response{
buildResponse(400,
&TokenErrorResponse{"authorization_pending", ""}),
@@ -293,28 +293,28 @@ var _ = Describe("CodetokenExchanger", func() {
buildResponse(200, &tokenResult),
}
token, err :=
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
- Expect(err).To(BeNil())
- Expect(token).To(Equal(&tokenResult))
+ gomega.Expect(err).To(gomega.BeNil())
+ gomega.Expect(token).To(gomega.Equal(&tokenResult))
endTime := time.Now()
- Expect(endTime.Sub(startTime)).To(BeNumerically(">",
exchangeRequest.PollInterval*3))
+
gomega.Expect(endTime.Sub(startTime)).To(gomega.BeNumerically(">",
exchangeRequest.PollInterval*3))
})
- It("implements expired_token", func() {
+ ginkgo.It("implements expired_token", func() {
mockTransport.Responses = []*http.Response{
buildResponse(400,
&TokenErrorResponse{"expired_token", ""}),
}
_, err :=
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
- Expect(err).ToNot(BeNil())
- Expect(err.Error()).To(Equal("the device code has
expired"))
+ gomega.Expect(err).ToNot(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("the device
code has expired"))
})
- It("implements access_denied", func() {
+ ginkgo.It("implements access_denied", func() {
mockTransport.Responses = []*http.Response{
buildResponse(400,
&TokenErrorResponse{"access_denied", ""}),
}
_, err :=
tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
- Expect(err).ToNot(BeNil())
- Expect(err.Error()).To(Equal("the device was not
authorized"))
+ gomega.Expect(err).ToNot(gomega.BeNil())
+ gomega.Expect(err.Error()).To(gomega.Equal("the device
was not authorized"))
})
})
})
diff --git a/oauth2/client_credentials_flow_test.go
b/oauth2/client_credentials_flow_test.go
index 6d9db354..8fd0a110 100644
--- a/oauth2/client_credentials_flow_test.go
+++ b/oauth2/client_credentials_flow_test.go
@@ -23,9 +23,8 @@ import (
"github.com/apache/pulsar-client-go/oauth2/clock"
"github.com/apache/pulsar-client-go/oauth2/clock/testing"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
type MockClientCredentialsProvider struct {
@@ -50,13 +49,13 @@ var clientCredentials = KeyFile{
Scope: "test_scope",
}
-var _ = Describe("ClientCredentialsFlow", func() {
- Describe("Authorize", func() {
+var _ = ginkgo.Describe("ClientCredentialsFlow", func() {
+ ginkgo.Describe("Authorize", func() {
var mockClock clock.Clock
var mockTokenExchanger *MockTokenExchanger
- BeforeEach(func() {
+ ginkgo.BeforeEach(func() {
mockClock = testing.NewFakeClock(time.Unix(0, 0))
expectedTokens := TokenResult{AccessToken:
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
mockTokenExchanger = &MockTokenExchanger{
@@ -64,7 +63,7 @@ var _ = Describe("ClientCredentialsFlow", func() {
}
})
- It("invokes TokenExchanger with credentials", func() {
+ ginkgo.It("invokes TokenExchanger with credentials", func() {
additionalScope := "additional_scope"
provider := newClientCredentialsFlow(
ClientCredentialsFlowOptions{
@@ -78,8 +77,8 @@ var _ = Describe("ClientCredentialsFlow", func() {
)
_, err := provider.Authorize("test_audience")
- Expect(err).ToNot(HaveOccurred())
-
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{
TokenEndpoint: oidcEndpoints.TokenEndpoint,
ClientID: clientCredentials.ClientID,
ClientSecret: clientCredentials.ClientSecret,
@@ -88,7 +87,7 @@ var _ = Describe("ClientCredentialsFlow", func() {
}))
})
- It("returns TokensResult from TokenExchanger", func() {
+ ginkgo.It("returns TokensResult from TokenExchanger", func() {
provider := newClientCredentialsFlow(
ClientCredentialsFlowOptions{
KeyFile: "test_keyfile",
@@ -100,12 +99,12 @@ var _ = Describe("ClientCredentialsFlow", func() {
)
grant, err := provider.Authorize("test_audience")
- Expect(err).ToNot(HaveOccurred())
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
expected :=
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
- Expect(*grant.Token).To(Equal(expected))
+ gomega.Expect(*grant.Token).To(gomega.Equal(expected))
})
- It("returns an error if token exchanger errors", func() {
+ ginkgo.It("returns an error if token exchanger errors", func() {
mockTokenExchanger.ReturnsError =
errors.New("someerror")
mockTokenExchanger.ReturnsTokens = nil
@@ -120,19 +119,19 @@ var _ = Describe("ClientCredentialsFlow", func() {
)
_, err := provider.Authorize("test_audience")
- Expect(err.Error()).To(Equal("authentication failed
using client credentials: " +
+
gomega.Expect(err.Error()).To(gomega.Equal("authentication failed using client
credentials: " +
"could not exchange client credentials:
someerror"))
})
})
})
-var _ = Describe("ClientCredentialsGrantRefresher", func() {
+var _ = ginkgo.Describe("ClientCredentialsGrantRefresher", func() {
- Describe("Refresh", func() {
+ ginkgo.Describe("Refresh", func() {
var mockClock clock.Clock
var mockTokenExchanger *MockTokenExchanger
- BeforeEach(func() {
+ ginkgo.BeforeEach(func() {
mockClock = testing.NewFakeClock(time.Unix(0, 0))
expectedTokens := TokenResult{AccessToken:
"accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
mockTokenExchanger = &MockTokenExchanger{
@@ -140,7 +139,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
}
})
- It("invokes TokenExchanger with credentials", func() {
+ ginkgo.It("invokes TokenExchanger with credentials", func() {
refresher := &ClientCredentialsGrantRefresher{
clock: mockClock,
exchanger: mockTokenExchanger,
@@ -154,8 +153,8 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
Scopes: []string{"profile"},
}
_, err := refresher.Refresh(og)
- Expect(err).ToNot(HaveOccurred())
-
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&ClientCredentialsExchangeRequest{
TokenEndpoint: oidcEndpoints.TokenEndpoint,
ClientID: clientCredentials.ClientID,
ClientSecret: clientCredentials.ClientSecret,
@@ -164,7 +163,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
}))
})
- It("returns a valid grant", func() {
+ ginkgo.It("returns a valid grant", func() {
refresher := &ClientCredentialsGrantRefresher{
clock: mockClock,
exchanger: mockTokenExchanger,
@@ -178,14 +177,14 @@ var _ = Describe("ClientCredentialsGrantRefresher",
func() {
Scopes: []string{"profile"},
}
ng, err := refresher.Refresh(og)
- Expect(err).ToNot(HaveOccurred())
- Expect(ng.Audience).To(Equal("test_audience"))
- Expect(ng.ClientID).To(Equal(""))
-
Expect(*ng.ClientCredentials).To(Equal(clientCredentials))
-
Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+
gomega.Expect(ng.Audience).To(gomega.Equal("test_audience"))
+ gomega.Expect(ng.ClientID).To(gomega.Equal(""))
+
gomega.Expect(*ng.ClientCredentials).To(gomega.Equal(clientCredentials))
+
gomega.Expect(ng.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint))
expected :=
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
- Expect(*ng.Token).To(Equal(expected))
- Expect(ng.Scopes).To(Equal([]string{"profile"}))
+ gomega.Expect(*ng.Token).To(gomega.Equal(expected))
+
gomega.Expect(ng.Scopes).To(gomega.Equal([]string{"profile"}))
})
})
})
diff --git a/oauth2/clock/testing/fake_clock.go
b/oauth2/clock/testing/fake_clock.go
index 6fcbf4c0..1a0bacff 100644
--- a/oauth2/clock/testing/fake_clock.go
+++ b/oauth2/clock/testing/fake_clock.go
@@ -196,24 +196,24 @@ func (i *IntervalClock) Since(ts time.Time) time.Duration
{
// After is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) After(d time.Duration) <-chan time.Time {
+func (*IntervalClock) After(_ time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement After")
}
// NewTimer is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
+func (*IntervalClock) NewTimer(_ time.Duration) clock.Timer {
panic("IntervalClock doesn't implement NewTimer")
}
// Tick is unimplemented, will panic.
// TODO: make interval clock use FakeClock so this can be implemented.
-func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
+func (*IntervalClock) Tick(_ time.Duration) <-chan time.Time {
panic("IntervalClock doesn't implement Tick")
}
// Sleep is unimplemented, will panic.
-func (*IntervalClock) Sleep(d time.Duration) {
+func (*IntervalClock) Sleep(_ time.Duration) {
panic("IntervalClock doesn't implement Sleep")
}
diff --git a/oauth2/config_tokenprovider_test.go
b/oauth2/config_tokenprovider_test.go
index d949a5a5..bfb56bfb 100644
--- a/oauth2/config_tokenprovider_test.go
+++ b/oauth2/config_tokenprovider_test.go
@@ -18,8 +18,8 @@
package oauth2
import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
type mockConfigProvider struct {
@@ -42,15 +42,15 @@ func (m *mockConfigProvider) SaveTokens(identifier,
accessToken, refreshToken st
m.SavedRefreshToken = refreshToken
}
-var _ = Describe("main", func() {
- Describe("configCachingProvider", func() {
- It("sets up the identifier using the clientID and audience",
func() {
+var _ = ginkgo.Describe("main", func() {
+ ginkgo.Describe("configCachingProvider", func() {
+ ginkgo.It("sets up the identifier using the clientID and
audience", func() {
p := NewConfigBackedCachingProvider("iamclientid",
"iamaudience", &mockConfigProvider{})
-
Expect(p.identifier).To(Equal("iamclientid-iamaudience"))
+
gomega.Expect(p.identifier).To(gomega.Equal("iamclientid-iamaudience"))
})
- It("gets tokens from the config provider", func() {
+ ginkgo.It("gets tokens from the config provider", func() {
c := &mockConfigProvider{
ReturnAccessToken: "accessToken",
ReturnRefreshToken: "refreshToken",
@@ -62,15 +62,15 @@ var _ = Describe("main", func() {
r, err := p.GetTokens()
- Expect(err).NotTo(HaveOccurred())
-
Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier))
- Expect(r).To(Equal(&TokenResult{
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
gomega.Expect(c.GetTokensCalledIdentifier).To(gomega.Equal(p.identifier))
+ gomega.Expect(r).To(gomega.Equal(&TokenResult{
AccessToken: c.ReturnAccessToken,
RefreshToken: c.ReturnRefreshToken,
}))
})
- It("caches the tokens in the config provider", func() {
+ ginkgo.It("caches the tokens in the config provider", func() {
c := &mockConfigProvider{}
p := ConfigBackedCachingProvider{
identifier: "iamidentifier",
@@ -83,9 +83,9 @@ var _ = Describe("main", func() {
p.CacheTokens(toSave)
- Expect(c.SavedIdentifier).To(Equal(p.identifier))
- Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken))
-
Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken))
+
gomega.Expect(c.SavedIdentifier).To(gomega.Equal(p.identifier))
+
gomega.Expect(c.SavedAccessToken).To(gomega.Equal(toSave.AccessToken))
+
gomega.Expect(c.SavedRefreshToken).To(gomega.Equal(toSave.RefreshToken))
})
})
})
diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go
index a238a484..deb09003 100644
--- a/oauth2/device_code_flow_test.go
+++ b/oauth2/device_code_flow_test.go
@@ -25,8 +25,8 @@ import (
"github.com/apache/pulsar-client-go/oauth2/clock/testing"
"golang.org/x/oauth2"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
type MockDeviceCodeProvider struct {
@@ -59,9 +59,9 @@ func (c *MockDeviceCodeCallback) Callback(code
*DeviceCodeResult) error {
return nil
}
-var _ = Describe("DeviceCodeFlow", func() {
+var _ = ginkgo.Describe("DeviceCodeFlow", func() {
- Describe("Authorize", func() {
+ ginkgo.Describe("Authorize", func() {
const audience = "test_clientID"
var mockClock clock.Clock
@@ -70,7 +70,7 @@ var _ = Describe("DeviceCodeFlow", func() {
var mockCallback *MockDeviceCodeCallback
var flow *DeviceCodeFlow
- BeforeEach(func() {
+ ginkgo.BeforeEach(func() {
mockClock = testing.NewFakeClock(time.Unix(0, 0))
mockCodeProvider = &MockDeviceCodeProvider{
@@ -107,21 +107,21 @@ var _ = Describe("DeviceCodeFlow", func() {
)
})
- It("invokes DeviceCodeProvider", func() {
+ ginkgo.It("invokes DeviceCodeProvider", func() {
_, _ = flow.Authorize(audience)
- Expect(mockCodeProvider.Called).To(BeTrue())
-
Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access"))
+
gomega.Expect(mockCodeProvider.Called).To(gomega.BeTrue())
+
gomega.Expect(mockCodeProvider.CalledWithAdditionalScopes).To(gomega.ContainElement("offline_access"))
})
- It("invokes callback with returned code", func() {
+ ginkgo.It("invokes callback with returned code", func() {
_, _ = flow.Authorize(audience)
- Expect(mockCallback.Called).To(BeTrue())
-
Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult))
+ gomega.Expect(mockCallback.Called).To(gomega.BeTrue())
+
gomega.Expect(mockCallback.DeviceCodeResult).To(gomega.Equal(mockCodeProvider.DeviceCodeResult))
})
- It("invokes TokenExchanger with returned code", func() {
+ ginkgo.It("invokes TokenExchanger with returned code", func() {
_, _ = flow.Authorize(audience)
-
Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{
+
gomega.Expect(mockTokenExchanger.CalledWithRequest).To(gomega.Equal(&DeviceCodeExchangeRequest{
TokenEndpoint: oidcEndpoints.TokenEndpoint,
ClientID: "test_clientID",
PollInterval: time.Duration(5) * time.Second,
@@ -129,28 +129,28 @@ var _ = Describe("DeviceCodeFlow", func() {
}))
})
- It("returns an authorization grant", func() {
+ ginkgo.It("returns an authorization grant", func() {
grant, _ := flow.Authorize(audience)
- Expect(grant).ToNot(BeNil())
- Expect(grant.Audience).To(Equal(audience))
- Expect(grant.ClientID).To(Equal("test_clientID"))
- Expect(grant.ClientCredentials).To(BeNil())
-
Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+ gomega.Expect(grant).ToNot(gomega.BeNil())
+ gomega.Expect(grant.Audience).To(gomega.Equal(audience))
+
gomega.Expect(grant.ClientID).To(gomega.Equal("test_clientID"))
+
gomega.Expect(grant.ClientCredentials).To(gomega.BeNil())
+
gomega.Expect(grant.TokenEndpoint).To(gomega.Equal(oidcEndpoints.TokenEndpoint))
expected :=
convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
- Expect(*grant.Token).To(Equal(expected))
+ gomega.Expect(*grant.Token).To(gomega.Equal(expected))
})
})
})
-var _ = Describe("DeviceAuthorizationGrantRefresher", func() {
+var _ = ginkgo.Describe("DeviceAuthorizationGrantRefresher", func() {
- Describe("Refresh", func() {
+ ginkgo.Describe("Refresh", func() {
var mockClock clock.Clock
var mockTokenExchanger *MockTokenExchanger
var refresher *DeviceAuthorizationGrantRefresher
var grant *AuthorizationGrant
- BeforeEach(func() {
+ ginkgo.BeforeEach(func() {
mockClock = testing.NewFakeClock(time.Unix(0, 0))
mockTokenExchanger = &MockTokenExchanger{}
@@ -169,62 +169,62 @@ var _ = Describe("DeviceAuthorizationGrantRefresher",
func() {
}
})
- It("invokes the token exchanger", func() {
+ ginkgo.It("invokes the token exchanger", func() {
mockTokenExchanger.ReturnsTokens = &TokenResult{
AccessToken: "new token",
}
_, _ = refresher.Refresh(grant)
-
Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{
+
gomega.Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(gomega.Equal(RefreshTokenExchangeRequest{
TokenEndpoint: oidcEndpoints.TokenEndpoint,
ClientID: grant.ClientID,
RefreshToken: "grt",
}))
})
- It("returns the refreshed access token from the
TokenExchanger", func() {
+ ginkgo.It("returns the refreshed access token from the
TokenExchanger", func() {
mockTokenExchanger.ReturnsTokens = &TokenResult{
AccessToken: "new token",
}
grant, _ = refresher.Refresh(grant)
-
Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
+
gomega.Expect(grant.Token.AccessToken).To(gomega.Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
})
- It("preserves the existing refresh token from the
TokenExchanger", func() {
+ ginkgo.It("preserves the existing refresh token from the
TokenExchanger", func() {
mockTokenExchanger.ReturnsTokens = &TokenResult{
AccessToken: "new token",
}
grant, _ = refresher.Refresh(grant)
- Expect(grant.Token.RefreshToken).To(Equal("grt"))
+
gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("grt"))
})
- It("returns the refreshed refresh token from the
TokenExchanger", func() {
+ ginkgo.It("returns the refreshed refresh token from the
TokenExchanger", func() {
mockTokenExchanger.ReturnsTokens = &TokenResult{
AccessToken: "new token",
RefreshToken: "new token",
}
grant, _ = refresher.Refresh(grant)
- Expect(grant.Token.RefreshToken).To(Equal("new token"))
+
gomega.Expect(grant.Token.RefreshToken).To(gomega.Equal("new token"))
})
- It("returns a meaningful expiration time", func() {
+ ginkgo.It("returns a meaningful expiration time", func() {
mockTokenExchanger.ReturnsTokens = &TokenResult{
AccessToken: "new token",
ExpiresIn: 60,
}
grant, _ = refresher.Refresh(grant)
-
Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) *
time.Second)))
+
gomega.Expect(grant.Token.Expiry).To(gomega.Equal(mockClock.Now().Add(time.Duration(60)
* time.Second)))
})
- It("returns an error when TokenExchanger does", func() {
+ ginkgo.It("returns an error when TokenExchanger does", func() {
mockTokenExchanger.ReturnsError =
errors.New("someerror")
_, err := refresher.Refresh(grant)
- Expect(err.Error()).To(Equal("could not exchange
refresh token: someerror"))
+ gomega.Expect(err.Error()).To(gomega.Equal("could not
exchange refresh token: someerror"))
})
})
})
diff --git a/oauth2/oidc_endpoint_provider_test.go
b/oauth2/oidc_endpoint_provider_test.go
index 065f9277..307f0a9e 100644
--- a/oauth2/oidc_endpoint_provider_test.go
+++ b/oauth2/oidc_endpoint_provider_test.go
@@ -22,17 +22,17 @@ import (
"net/http"
"net/http/httptest"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
+ "github.com/onsi/ginkgo"
+ "github.com/onsi/gomega"
)
-var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
- It("calls and gets the well known data from the correct endpoint for
the issuer", func() {
+var _ = ginkgo.Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
+ ginkgo.It("calls and gets the well known data from the correct endpoint
for the issuer", func() {
var req *http.Request
wkEndpointsResp := OIDCWellKnownEndpoints{
AuthorizationEndpoint: "the-auth-endpoint",
TokenEndpoint: "the-token-endpoint"}
responseBytes, err := json.Marshal(wkEndpointsResp)
- Expect(err).ToNot(HaveOccurred())
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
ts := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
req = r
@@ -45,31 +45,31 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL",
func() {
endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
- Expect(err).ToNot(HaveOccurred())
- Expect(*endpoints).To(Equal(wkEndpointsResp))
-
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+ gomega.Expect(err).ToNot(gomega.HaveOccurred())
+ gomega.Expect(*endpoints).To(gomega.Equal(wkEndpointsResp))
+
gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration"))
})
- It("errors when url.Parse errors", func() {
+ ginkgo.It("errors when url.Parse errors", func() {
endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://")
- Expect(err).To(HaveOccurred())
- Expect(err.Error()).To(Equal(
+ gomega.Expect(err).To(gomega.HaveOccurred())
+ gomega.Expect(err.Error()).To(gomega.Equal(
"could not parse issuer url to build well known
endpoints: parse \"://\": missing protocol scheme"))
- Expect(endpoints).To(BeNil())
+ gomega.Expect(endpoints).To(gomega.BeNil())
})
- It("errors when the get errors", func() {
+ ginkgo.It("errors when the get errors", func() {
endpoints, err :=
GetOIDCWellKnownEndpointsFromIssuerURL("https://")
- Expect(err).To(HaveOccurred())
- Expect(err.Error()).To(Equal(
+ gomega.Expect(err).To(gomega.HaveOccurred())
+ gomega.Expect(err.Error()).To(gomega.Equal(
"could not get well known endpoints from url
https://.well-known/openid-configuration: " +
"Get
\"https://.well-known/openid-configuration\": dial tcp: lookup .well-known: no
such host"))
- Expect(endpoints).To(BeNil())
+ gomega.Expect(endpoints).To(gomega.BeNil())
})
- It("errors when the json decoder errors", func() {
+ ginkgo.It("errors when the json decoder errors", func() {
var req *http.Request
ts := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
@@ -83,10 +83,10 @@ var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL",
func() {
endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
- Expect(err).To(HaveOccurred())
- Expect(err.Error()).To(Equal("could not decode json body when
getting well" +
+ gomega.Expect(err).To(gomega.HaveOccurred())
+ gomega.Expect(err.Error()).To(gomega.Equal("could not decode
json body when getting well" +
" known endpoints: invalid character '<' looking for
beginning of value"))
- Expect(endpoints).To(BeNil())
-
Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+ gomega.Expect(endpoints).To(gomega.BeNil())
+
gomega.Expect(req.URL.Path).To(gomega.Equal("/.well-known/openid-configuration"))
})
})
diff --git a/perf/perf-consumer.go b/perf/perf-consumer.go
index 2172af53..5001d8d1 100644
--- a/perf/perf-consumer.go
+++ b/perf/perf-consumer.go
@@ -46,7 +46,7 @@ func newConsumerCommand() *cobra.Command {
Use: "consume <topic>",
Short: "Consume from topic",
Args: cobra.ExactArgs(1),
- Run: func(cmd *cobra.Command, args []string) {
+ Run: func(_ *cobra.Command, args []string) {
stop := stopCh()
if FlagProfile {
RunProfiling(stop)
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index f8e14d22..da0a499e 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -49,7 +49,7 @@ func newProducerCommand() *cobra.Command {
Use: "produce ",
Short: "Produce on a topic and measure performance",
Args: cobra.ExactArgs(1),
- Run: func(cmd *cobra.Command, args []string) {
+ Run: func(_ *cobra.Command, args []string) {
stop := stopCh()
if FlagProfile {
RunProfiling(stop)
@@ -125,7 +125,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan
struct{}) {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: payload,
- }, func(msgID pulsar.MessageID, message
*pulsar.ProducerMessage, e error) {
+ }, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage,
e error) {
if e != nil {
log.WithError(e).Fatal("Failed to
publish")
}
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 40257e83..701ef412 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -88,7 +88,7 @@ func initLogger(debug bool) {
func main() {
rootCmd := &cobra.Command{
- PersistentPreRun: func(cmd *cobra.Command, args []string) {
+ PersistentPreRun: func(_ *cobra.Command, _ []string) {
initLogger(flagDebug)
},
Use: "pulsar-perf-go",
diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
index 97d9e05a..c4ecc003 100644
--- a/pulsar/ack_grouping_tracker.go
+++ b/pulsar/ack_grouping_tracker.go
@@ -97,7 +97,7 @@ func (i *immediateAckGroupingTracker) addCumulative(id
MessageID) {
i.ackCumulative(id)
}
-func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool {
+func (i *immediateAckGroupingTracker) isDuplicate(_ MessageID) bool {
return false
}
diff --git a/pulsar/ack_grouping_tracker_test.go
b/pulsar/ack_grouping_tracker_test.go
index 1fe5a568..57adf888 100644
--- a/pulsar/ack_grouping_tracker_test.go
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -190,8 +190,8 @@ func TestTimedTrackerCumulativeAck(t *testing.T) {
}
func TestTimedTrackerIsDuplicate(t *testing.T) {
- tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id
MessageID) {},
- func(id []*pb.MessageIdData) {})
+ tracker := newAckGroupingTracker(nil, func(_ MessageID) {}, func(_
MessageID) {},
+ func(_ []*pb.MessageIdData) {})
tracker.add(&messageID{batchIdx: 0, batchSize: 3})
tracker.add(&messageID{batchIdx: 2, batchSize: 3})
diff --git a/pulsar/auth/athenz_test.go b/pulsar/auth/athenz_test.go
index 89118081..a4d1fe5d 100644
--- a/pulsar/auth/athenz_test.go
+++ b/pulsar/auth/athenz_test.go
@@ -48,13 +48,13 @@ type MockRoleToken struct {
mock.Mock
}
-func (m *MockTokenBuilder) SetExpiration(t time.Duration) {
+func (m *MockTokenBuilder) SetExpiration(_ time.Duration) {
}
-func (m *MockTokenBuilder) SetHostname(h string) {
+func (m *MockTokenBuilder) SetHostname(_ string) {
}
-func (m *MockTokenBuilder) SetIPAddress(ip string) {
+func (m *MockTokenBuilder) SetIPAddress(_ string) {
}
-func (m *MockTokenBuilder) SetKeyService(keyService string) {
+func (m *MockTokenBuilder) SetKeyService(_ string) {
}
func (m *MockTokenBuilder) Token() zms.Token {
result := m.Called()
diff --git a/pulsar/auth/disabled.go b/pulsar/auth/disabled.go
index 0389a39b..4614cbdf 100644
--- a/pulsar/auth/disabled.go
+++ b/pulsar/auth/disabled.go
@@ -49,7 +49,7 @@ func (disabled) Close() error {
return nil
}
-func (d disabled) RoundTrip(req *http.Request) (*http.Response, error) {
+func (d disabled) RoundTrip(_ *http.Request) (*http.Response, error) {
return nil, nil
}
@@ -57,6 +57,6 @@ func (d disabled) Transport() http.RoundTripper {
return nil
}
-func (d disabled) WithTransport(tripper http.RoundTripper) error {
+func (d disabled) WithTransport(_ http.RoundTripper) error {
return nil
}
diff --git a/pulsar/auth/oauth2_test.go b/pulsar/auth/oauth2_test.go
index 86d36404..cc3d11c1 100644
--- a/pulsar/auth/oauth2_test.go
+++ b/pulsar/auth/oauth2_test.go
@@ -35,7 +35,7 @@ func mockOAuthServer() *httptest.Server {
// mock the used REST path for the tests
mockedHandler := http.NewServeMux()
- mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, _ *http.Request) {
s := fmt.Sprintf(`{
"issuer":"%s",
"authorization_endpoint":"%s/authorize",
@@ -44,10 +44,10 @@ func mockOAuthServer() *httptest.Server {
}`, server.URL, server.URL, server.URL, server.URL)
fmt.Fprintln(writer, s)
})
- mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, _ *http.Request) {
fmt.Fprintln(writer, "{\n \"access_token\":
\"token-content\",\n \"token_type\": \"Bearer\"\n}")
})
- mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
request *http.Request) {
+ mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
fmt.Fprintln(writer, "true")
})
diff --git a/pulsar/auth/token.go b/pulsar/auth/token.go
index 898b6537..1d7fcff1 100644
--- a/pulsar/auth/token.go
+++ b/pulsar/auth/token.go
@@ -38,9 +38,8 @@ func NewAuthenticationTokenWithParams(params
map[string]string) (Provider, error
return NewAuthenticationToken(params["token"]), nil
} else if params["file"] != "" {
return NewAuthenticationTokenFromFile(params["file"]), nil
- } else {
- return nil, errors.New("missing configuration for token auth")
}
+ return nil, errors.New("missing configuration for token auth")
}
// NewAuthenticationToken returns a token auth provider that will use the
specified token to
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 5b6b8f1f..0fdf6be7 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -257,7 +257,7 @@ func mockOAuthServer() *httptest.Server {
// mock the used REST path for the tests
mockedHandler := http.NewServeMux()
- mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, _ *http.Request) {
s := fmt.Sprintf(`{
"issuer":"%s",
"authorization_endpoint":"%s/authorize",
@@ -266,13 +266,13 @@ func mockOAuthServer() *httptest.Server {
}`, server.URL, server.URL, server.URL, server.URL)
fmt.Fprintln(writer, s)
})
- mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, _ *http.Request) {
fmt.Fprintln(writer, "{\n"+
" \"access_token\":
\"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+
"tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+
" \"token_type\": \"Bearer\"\n}")
})
- mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
request *http.Request) {
+ mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
fmt.Fprintln(writer, "true")
})
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index a3d3e3ff..92376ac1 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"math/rand"
"strconv"
@@ -492,7 +493,7 @@ func (c *consumer) unsubscribe(force bool) error {
}
}
if errMsg != "" {
- return fmt.Errorf(errMsg)
+ return errors.New(errMsg)
}
return nil
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 020e38a0..3030bda3 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -294,11 +294,11 @@ func (c *multiTopicConsumer) Close() {
})
}
-func (c *multiTopicConsumer) Seek(msgID MessageID) error {
+func (c *multiTopicConsumer) Seek(_ MessageID) error {
return newError(SeekFailed, "seek command not allowed for multi topic
consumer")
}
-func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
+func (c *multiTopicConsumer) SeekByTime(_ time.Time) error {
return newError(SeekFailed, "seek command not allowed for multi topic
consumer")
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 831f763a..0e8274e6 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -786,18 +786,18 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
pc.metrics.NacksCounter.Inc()
}
-func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+func (pc *partitionConsumer) Redeliver(msgIDs []messageID) {
if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
pc.log.WithField("state", state).Error("Failed to redeliver
closing or closed consumer")
return
}
- pc.eventsCh <- &redeliveryRequest{msgIds}
+ pc.eventsCh <- &redeliveryRequest{msgIDs}
- iMsgIds := make([]MessageID, len(msgIds))
- for i := range iMsgIds {
- iMsgIds[i] = &msgIds[i]
+ iMsgIDs := make([]MessageID, len(msgIDs))
+ for i := range iMsgIDs {
+ iMsgIDs[i] = &msgIDs[i]
}
- pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIds)
+ pc.options.interceptors.OnNegativeAcksSend(pc.parentConsumer, iMsgIDs)
}
func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
@@ -805,14 +805,14 @@ func (pc *partitionConsumer) internalRedeliver(req
*redeliveryRequest) {
pc.log.WithField("state", state).Error("Failed to redeliver
closing or closed consumer")
return
}
- msgIds := req.msgIds
- pc.log.Debug("Request redelivery after negative ack for messages",
msgIds)
+ msgIDs := req.msgIDs
+ pc.log.Debug("Request redelivery after negative ack for messages",
msgIDs)
- msgIDDataList := make([]*pb.MessageIdData, len(msgIds))
- for i := 0; i < len(msgIds); i++ {
+ msgIDDataList := make([]*pb.MessageIdData, len(msgIDs))
+ for i := 0; i < len(msgIDs); i++ {
msgIDDataList[i] = &pb.MessageIdData{
- LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
- EntryId: proto.Uint64(uint64(msgIds[i].entryID)),
+ LedgerId: proto.Uint64(uint64(msgIDs[i].ledgerID)),
+ EntryId: proto.Uint64(uint64(msgIDs[i].entryID)),
}
}
@@ -1560,7 +1560,7 @@ type closeRequest struct {
}
type redeliveryRequest struct {
- msgIds []messageID
+ msgIDs []messageID
}
type getLastMsgIDRequest struct {
@@ -1906,10 +1906,9 @@ func (pc *partitionConsumer) clearReceiverQueue()
*trackingMessageID {
// If the queue was empty we need to restart from the message
just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
- } else {
- // No message was received or dequeued by this consumer. Next
message would still be the startMessageId
- return pc.startMessageID.get()
}
+ // No message was received or dequeued by this consumer. Next message
would still be the startMessageId
+ return pc.startMessageID.get()
}
func getPreviousMessage(mid *trackingMessageID) *trackingMessageID {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a4a8d995..37cc9632 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -982,7 +982,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
wg.Add(1)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
- func(id MessageID, producerMessage *ProducerMessage, e
error) {
+ func(_ MessageID, _ *ProducerMessage, e error) {
assert.NoError(t, e)
wg.Done()
})
@@ -998,7 +998,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
wg.Add(1)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
- func(id MessageID, producerMessage *ProducerMessage, e
error) {
+ func(_ MessageID, _ *ProducerMessage, e error) {
assert.NoError(t, e)
wg.Done()
})
@@ -2301,7 +2301,7 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
- MessageRouter: func(msg *ProducerMessage, topicMetadata
TopicMetadata) int {
+ MessageRouter: func(msg *ProducerMessage, _ TopicMetadata) int {
// The message key will contain the partition id where
to route
i, err := strconv.Atoi(msg.Key)
assert.NoError(t, err)
@@ -2422,11 +2422,11 @@ func TestProducerName(t *testing.T) {
type noopConsumerInterceptor struct{}
-func (noopConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+func (noopConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {}
-func (noopConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID
MessageID) {}
+func (noopConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
-func (noopConsumerInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs
[]MessageID) {}
+func (noopConsumerInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
// copyPropertyInterceptor copy all keys in message properties map and add a
suffix
type copyPropertyInterceptor struct {
@@ -2435,31 +2435,31 @@ type copyPropertyInterceptor struct {
func (x copyPropertyInterceptor) BeforeConsume(message ConsumerMessage) {
properties := message.Properties()
- copy := make(map[string]string, len(properties))
+ cp := make(map[string]string, len(properties))
for k, v := range properties {
- copy[k+x.suffix] = v
+ cp[k+x.suffix] = v
}
- for ck, v := range copy {
+ for ck, v := range cp {
properties[ck] = v
}
}
-func (copyPropertyInterceptor) OnAcknowledge(consumer Consumer, msgID
MessageID) {}
+func (copyPropertyInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {}
-func (copyPropertyInterceptor) OnNegativeAcksSend(consumer Consumer, msgIDs
[]MessageID) {}
+func (copyPropertyInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {}
type metricConsumerInterceptor struct {
ackn int32
nackn int32
}
-func (x *metricConsumerInterceptor) BeforeConsume(message ConsumerMessage) {}
+func (x *metricConsumerInterceptor) BeforeConsume(_ ConsumerMessage) {}
-func (x *metricConsumerInterceptor) OnAcknowledge(consumer Consumer, msgID
MessageID) {
+func (x *metricConsumerInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {
atomic.AddInt32(&x.ackn, 1)
}
-func (x *metricConsumerInterceptor) OnNegativeAcksSend(consumer Consumer,
msgIDs []MessageID) {
+func (x *metricConsumerInterceptor) OnNegativeAcksSend(_ Consumer, msgIDs
[]MessageID) {
atomic.AddInt32(&x.nackn, int32(len(msgIDs)))
}
@@ -2512,7 +2512,7 @@ func TestConsumerWithInterceptors(t *testing.T) {
}
}
- var nackIds []MessageID
+ var nackIDs []MessageID
// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
@@ -2533,13 +2533,13 @@ func TestConsumerWithInterceptors(t *testing.T) {
if i%2 == 0 {
consumer.Ack(msg)
} else {
- nackIds = append(nackIds, msg.ID())
+ nackIDs = append(nackIDs, msg.ID())
}
}
assert.Equal(t, int32(5), atomic.LoadInt32(&metric.ackn))
- for i := range nackIds {
- consumer.NackID(nackIds[i])
+ for i := range nackIDs {
+ consumer.NackID(nackIDs[i])
}
// receive 5 nack messages
@@ -2633,7 +2633,7 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t
*testing.T) {
producer.SendAsync(ctx, &ProducerMessage{
Key: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
@@ -2728,7 +2728,7 @@ func
TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
producer.SendAsync(ctx, &ProducerMessage{
Key: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
@@ -2762,7 +2762,7 @@ func
TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
Key: u.String(),
OrderingKey: k,
Payload: []byte(fmt.Sprintf("value-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
assert.Nil(t, err)
},
)
@@ -3511,12 +3511,12 @@ func NewEncKeyReader(publicKeyPath, privateKeyPath
string) *EncKeyReader {
}
// GetPublicKey read public key from the given path
-func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string)
(*crypto.EncryptionKeyInfo, error) {
+func (d *EncKeyReader) PublicKey(keyName string, _ map[string]string)
(*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.publicKeyPath, d.metaMap)
}
// GetPrivateKey read private key from the given path
-func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string)
(*crypto.EncryptionKeyInfo, error) {
+func (d *EncKeyReader) PrivateKey(keyName string, _ map[string]string)
(*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.privateKeyPath, d.metaMap)
}
@@ -4022,31 +4022,31 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool, o
for i := 0; i < BatchingMaxSize; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage, err
error) {
+ }, func(id MessageID, _ *ProducerMessage, err error) {
assert.Nil(t, err)
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(),
id.BatchSize())
})
}
assert.Nil(t, producer.FlushWithCtx(context.Background()))
- msgIds := make([]MessageID, BatchingMaxSize)
+ msgIDs := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
- msgIds[i] = message.ID()
+ msgIDs[i] = message.ID()
log.Printf("Received %v from %v:%d:%d",
string(message.Payload()), message.ID(),
message.ID().BatchIdx(), message.ID().BatchSize())
}
// Acknowledge half of the messages
if cumulative {
- msgID := msgIds[BatchingMaxSize/2-1]
+ msgID := msgIDs[BatchingMaxSize/2-1]
err := consumer.AckIDCumulative(msgID)
assert.Nil(t, err)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID,
msgID.BatchIdx())
} else {
for i := 0; i < BatchingMaxSize; i++ {
- msgID := msgIds[i]
+ msgID := msgIDs[i]
if i%2 == 0 {
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID,
msgID.BatchIdx())
@@ -4066,17 +4066,17 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool, o
index = i + BatchingMaxSize/2
}
assert.Equal(t, []byte(fmt.Sprintf("msg-%d", index)),
message.Payload())
- assert.Equal(t, msgIds[index].BatchIdx(),
message.ID().BatchIdx())
+ assert.Equal(t, msgIDs[index].BatchIdx(),
message.ID().BatchIdx())
// We should not acknowledge message.ID() here because
message.ID() shares a different
// tracker with msgIds
if !cumulative {
- msgID := msgIds[index]
+ msgID := msgIDs[index]
consumer.AckID(msgID)
log.Printf("Acknowledge %v:%d\n", msgID,
msgID.BatchIdx())
}
}
if cumulative {
- msgID := msgIds[BatchingMaxSize-1]
+ msgID := msgIDs[BatchingMaxSize-1]
err := consumer.AckIDCumulative(msgID)
assert.Nil(t, err)
log.Printf("Acknowledge %v:%d cumulatively\n", msgID,
msgID.BatchIdx())
@@ -4167,7 +4167,7 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T)
{
p.SendAsync(
context.Background(),
&ProducerMessage{Payload: []byte("hello")},
- func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ func(_ MessageID, _ *ProducerMessage, _ error) {
},
)
}
@@ -4333,7 +4333,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
- func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ func(_ MessageID, _ *ProducerMessage, _ error) {
},
)
}
@@ -4373,7 +4373,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
- func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ func(_ MessageID, _ *ProducerMessage, _ error) {
},
)
}
@@ -4456,7 +4456,7 @@ func TestMultiConsumerMemoryLimit(t *testing.T) {
p1.SendAsync(
context.Background(),
&ProducerMessage{Payload: createTestMessagePayload(1)},
- func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ func(_ MessageID, _ *ProducerMessage, _ error) {
},
)
}
diff --git a/pulsar/crypto/default_message_crypto.go
b/pulsar/crypto/default_message_crypto.go
index 74e4592d..2239fa39 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -169,7 +169,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
// we should never reach here
msg := fmt.Sprintf("%v Failed to find encrypted Data
key for key %v", d.logCtx, keyName)
d.logger.Errorf(msg)
- return nil, fmt.Errorf(msg)
+ return nil, errors.New(msg)
}
}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6b13b329..00c7e03b 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -126,7 +126,7 @@ func (r *dlqRouter) run() {
Properties: properties,
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
- }, func(messageID MessageID, producerMessage
*ProducerMessage, err error) {
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
if err == nil {
r.log.WithField("msgID",
msgID).Debug("Succeed to send message to DLQ")
// The Producer ack might be coming
from the connection go-routine that
@@ -185,9 +185,8 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
r.log.WithError(err).Error("Failed to create DLQ
producer")
time.Sleep(bo.Next())
continue
- } else {
- r.producer = producer
- return producer
}
+ r.producer = producer
+ return producer
}
}
diff --git a/pulsar/helper_for_test.go b/pulsar/helper_for_test.go
index 426855b2..1cb3f6cb 100644
--- a/pulsar/helper_for_test.go
+++ b/pulsar/helper_for_test.go
@@ -70,7 +70,7 @@ func httpDelete(requestPaths ...string) error {
var errs error
for _, requestPath := range requestPaths {
if err := httpDo(http.MethodDelete, requestPath, nil, nil); err
!= nil {
- errs = pkgerrors.Wrapf(err, "unable to delete url:
%s"+requestPath)
+ errs = pkgerrors.Wrapf(err, "unable to delete url: %s",
requestPath)
}
}
return errs
@@ -186,5 +186,5 @@ func retryAssert(t assert.TestingT, times int, milliseconds
int, update func(),
type fakeAssertT struct{}
-func (fa fakeAssertT) Errorf(format string, args ...interface{}) {
+func (fa fakeAssertT) Errorf(_ string, _ ...interface{}) {
}
diff --git a/pulsar/internal/channel_cond_test.go
b/pulsar/internal/channel_cond_test.go
index a73d44ea..93a04084 100644
--- a/pulsar/internal/channel_cond_test.go
+++ b/pulsar/internal/channel_cond_test.go
@@ -24,7 +24,7 @@ import (
"time"
)
-func TestChCond(t *testing.T) {
+func TestChCond(_ *testing.T) {
cond := newCond(&sync.Mutex{})
wg := sync.WaitGroup{}
wg.Add(1)
@@ -39,7 +39,7 @@ func TestChCond(t *testing.T) {
wg.Wait()
}
-func TestChCondWithContext(t *testing.T) {
+func TestChCondWithContext(_ *testing.T) {
cond := newCond(&sync.Mutex{})
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
diff --git a/pulsar/internal/compression/noop.go
b/pulsar/internal/compression/noop.go
index 78acb52d..1607c85f 100644
--- a/pulsar/internal/compression/noop.go
+++ b/pulsar/internal/compression/noop.go
@@ -42,7 +42,7 @@ func (noopProvider) Compress(dst, src []byte) []byte {
return dst[:len(src)]
}
-func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte,
error) {
+func (noopProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
if dst == nil {
dst = make([]byte, len(src))
}
diff --git a/pulsar/internal/compression/zstd_cgo.go
b/pulsar/internal/compression/zstd_cgo.go
index dde54ae2..8cad7827 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -75,7 +75,7 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
return out
}
-func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int)
([]byte, error) {
+func (z *zstdCGoProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
return ctx.Decompress(dst, src)
diff --git a/pulsar/internal/compression/zstd_go.go
b/pulsar/internal/compression/zstd_go.go
index ae850783..77389103 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -58,7 +58,7 @@ func (p *zstdProvider) Compress(dst, src []byte) []byte {
return p.encoder.EncodeAll(src, dst)
}
-func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte,
error) {
+func (p *zstdProvider) Decompress(dst, src []byte, _ int) ([]byte, error) {
return p.decoder.DecodeAll(src, dst)
}
diff --git a/pulsar/internal/crypto/consumer_decryptor.go
b/pulsar/internal/crypto/consumer_decryptor.go
index bbc1f9b1..8db89027 100644
--- a/pulsar/internal/crypto/consumer_decryptor.go
+++ b/pulsar/internal/crypto/consumer_decryptor.go
@@ -42,7 +42,7 @@ func NewConsumerDecryptor(keyReader crypto.KeyReader,
}
func (d *consumerDecryptor) Decrypt(payload []byte,
- msgID *pb.MessageIdData,
+ _ *pb.MessageIdData,
msgMetadata *pb.MessageMetadata) ([]byte, error) {
// encryption keys are not present in message metadta, no need decrypt
the payload
if len(msgMetadata.GetEncryptionKeys()) == 0 {
diff --git a/pulsar/internal/crypto/noop_decryptor.go
b/pulsar/internal/crypto/noop_decryptor.go
index c049c472..166b1541 100644
--- a/pulsar/internal/crypto/noop_decryptor.go
+++ b/pulsar/internal/crypto/noop_decryptor.go
@@ -31,7 +31,7 @@ func NewNoopDecryptor() Decryptor {
// Decrypt noop decryptor
func (d *noopDecryptor) Decrypt(payload []byte,
- msgID *pb.MessageIdData,
+ _ *pb.MessageIdData,
msgMetadata *pb.MessageMetadata) ([]byte, error) {
if len(msgMetadata.GetEncryptionKeys()) > 0 {
return payload, fmt.Errorf("incoming message payload is
encrypted, consumer is not configured to decrypt")
diff --git a/pulsar/internal/crypto/noop_encryptor.go
b/pulsar/internal/crypto/noop_encryptor.go
index 4512e7bd..7c52755f 100644
--- a/pulsar/internal/crypto/noop_encryptor.go
+++ b/pulsar/internal/crypto/noop_encryptor.go
@@ -28,6 +28,6 @@ func NewNoopEncryptor() Encryptor {
}
// Encrypt Noop ecryptor
-func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata)
([]byte, error) {
+func (e *noopEncryptor) Encrypt(data []byte, _ *pb.MessageMetadata) ([]byte,
error) {
return data, nil
}
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index f1420aea..632d5a4f 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -159,9 +159,7 @@ func (c *httpClient) Get(endpoint string, obj interface{},
params map[string]str
c.log.Debugf("Retrying httpRequest in {%v} with timeout
in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
_, err = c.GetWithQueryParams(endpoint, obj, params,
true)
- if _, ok := err.(*url.Error); ok {
- continue
- } else {
+ if _, ok := err.(*url.Error); !ok {
// We either succeeded or encountered a non
connection error
break
}
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 81f496b3..6c07e7d2 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -367,7 +367,7 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace
string, mode GetTopic
return topics, nil
}
-func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte)
(schema *pb.Schema, err error) {
+func (h *httpLookupService) GetSchema(_ string, _ []byte) (schema *pb.Schema,
err error) {
return nil, errors.New("GetSchema is not supported by
httpLookupService")
}
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 349d1929..fd08898a 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -57,7 +57,7 @@ func (c *mockedLookupRPCClient) NewConsumerID() uint64 {
return 1
}
-func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestToAnyBroker(_ uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
@@ -77,16 +77,16 @@ func (c *mockedLookupRPCClient)
RequestToAnyBroker(requestID uint64, cmdType pb.
}, nil
}
-func (c *mockedLookupRPCClient) RequestToHost(serviceNameResolver
*ServiceNameResolver, requestID uint64,
+func (c *mockedLookupRPCClient) RequestToHost(_ *ServiceNameResolver,
requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
return c.RequestToAnyBroker(requestID, cmdType, message)
}
-func (c *mockedLookupRPCClient) LookupService(URL string) LookupService {
+func (c *mockedLookupRPCClient) LookupService(_ string) LookupService {
return nil
}
-func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr
*url.URL, requestID uint64,
+func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr
*url.URL, _ uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
@@ -108,14 +108,14 @@ func (c *mockedLookupRPCClient) Request(logicalAddr
*url.URL, physicalAddr *url.
}, nil
}
-func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type,
- message proto.Message) (*RPCResult, error) {
+func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _
pb.BaseCommand_Type,
+ _ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}
-func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType
pb.BaseCommand_Type,
- message proto.Message) error {
+func (c *mockedLookupRPCClient) RequestOnCnxNoWait(_ Connection, _
pb.BaseCommand_Type,
+ _ proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
return nil
}
@@ -458,7 +458,7 @@ func (m mockedPartitionedTopicMetadataRPCClient)
NewConsumerID() uint64 {
}
func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID
uint64, cmdType pb.BaseCommand_Type,
- message proto.Message) (*RPCResult, error) {
+ _ proto.Message) (*RPCResult, error) {
assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA)
expectedRequest := &m.expectedRequests[0]
@@ -477,30 +477,30 @@ func (m mockedPartitionedTopicMetadataRPCClient)
RequestToAnyBroker(requestID ui
}, nil
}
-func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
+func (m mockedPartitionedTopicMetadataRPCClient) Request(_ *url.URL, _
*url.URL, _ uint64,
+ _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}
-func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx
Connection, cmdType pb.BaseCommand_Type,
- message proto.Message) error {
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_
Connection, _ pb.BaseCommand_Type,
+ _ proto.Message) error {
assert.Fail(m.t, "Shouldn't be called")
return nil
}
-func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection,
requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(_ Connection, _
uint64,
+ _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}
-func (m *mockedPartitionedTopicMetadataRPCClient)
RequestToHost(serviceNameResolver *ServiceNameResolver,
+func (m *mockedPartitionedTopicMetadataRPCClient) RequestToHost(_
*ServiceNameResolver,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error) {
return m.RequestToAnyBroker(requestID, cmdType, message)
}
-func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(URL string)
LookupService {
+func (m *mockedPartitionedTopicMetadataRPCClient) LookupService(_ string)
LookupService {
return nil
}
@@ -575,7 +575,7 @@ type MockHTTPClient struct {
func (c *MockHTTPClient) Close() {}
-func (c *MockHTTPClient) Get(endpoint string, obj interface{}, params
map[string]string) error {
+func (c *MockHTTPClient) Get(endpoint string, obj interface{}, _
map[string]string) error {
if strings.Contains(endpoint, HTTPLookupServiceBasePathV1) ||
strings.Contains(endpoint,
HTTPLookupServiceBasePathV2) {
return mockHTTPGetLookupResult(obj)
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go
b/pulsar/internal/pulsartracing/consumer_interceptor.go
index 3969d45d..3b91e7cb 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor.go
@@ -33,9 +33,9 @@ func (t *ConsumerInterceptor) BeforeConsume(message
pulsar.ConsumerMessage) {
buildAndInjectChildSpan(message).Finish()
}
-func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID
pulsar.MessageID) {}
+func (t *ConsumerInterceptor) OnAcknowledge(_ pulsar.Consumer, _
pulsar.MessageID) {}
-func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer,
msgIDs []pulsar.MessageID) {
+func (t *ConsumerInterceptor) OnNegativeAcksSend(_ pulsar.Consumer, _
[]pulsar.MessageID) {
}
func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
index 05953cf1..1fa1bf0d 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -52,7 +52,7 @@ func (c *mockConsumer) Subscription() string {
return ""
}
-func (c *mockConsumer) AckWithTxn(msg pulsar.Message, txn pulsar.Transaction)
error {
+func (c *mockConsumer) AckWithTxn(_ pulsar.Message, _ pulsar.Transaction)
error {
return nil
}
@@ -63,7 +63,7 @@ func (c *mockConsumer) UnsubscribeForce() error {
return nil
}
-func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message,
err error) {
+func (c *mockConsumer) Receive(_ context.Context) (message pulsar.Message, err
error) {
return nil, nil
}
@@ -71,39 +71,39 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage
{
return nil
}
-func (c *mockConsumer) Ack(msg pulsar.Message) error {
+func (c *mockConsumer) Ack(_ pulsar.Message) error {
return nil
}
-func (c *mockConsumer) AckID(msgID pulsar.MessageID) error {
+func (c *mockConsumer) AckID(_ pulsar.MessageID) error {
return nil
}
-func (c *mockConsumer) AckCumulative(msg pulsar.Message) error {
+func (c *mockConsumer) AckCumulative(_ pulsar.Message) error {
return nil
}
-func (c *mockConsumer) AckIDCumulative(msgID pulsar.MessageID) error {
+func (c *mockConsumer) AckIDCumulative(_ pulsar.MessageID) error {
return nil
}
-func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration)
{}
+func (c *mockConsumer) ReconsumeLater(_ pulsar.Message, _ time.Duration) {}
-func (c *mockConsumer) ReconsumeLaterWithCustomProperties(msg pulsar.Message,
customProperties map[string]string,
- delay time.Duration) {
+func (c *mockConsumer) ReconsumeLaterWithCustomProperties(_ pulsar.Message, _
map[string]string,
+ _ time.Duration) {
}
-func (c *mockConsumer) Nack(msg pulsar.Message) {}
+func (c *mockConsumer) Nack(_ pulsar.Message) {}
-func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}
+func (c *mockConsumer) NackID(_ pulsar.MessageID) {}
func (c *mockConsumer) Close() {}
-func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
+func (c *mockConsumer) Seek(_ pulsar.MessageID) error {
return nil
}
-func (c *mockConsumer) SeekByTime(time time.Time) error {
+func (c *mockConsumer) SeekByTime(_ time.Time) error {
return nil
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go
b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
index 3b6394ba..8fcfa17e 100644
--- a/pulsar/internal/pulsartracing/message_carrier_adaptors.go
+++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go
@@ -38,14 +38,14 @@ func (a *ProducerMessageExtractAdapter) ForeachKey(handler
func(key, val string)
return nil
}
-func (a *ProducerMessageExtractAdapter) Set(key, val string) {}
+func (a *ProducerMessageExtractAdapter) Set(_, _ string) {}
// ProducerMessageInjectAdapter Implements TextMap Interface
type ProducerMessageInjectAdapter struct {
message *pulsar.ProducerMessage
}
-func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val
string) error) error {
+func (a *ProducerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error)
error {
return errors.New("iterator should never be used with Tracer.inject()")
}
@@ -68,14 +68,14 @@ func (a *ConsumerMessageExtractAdapter) ForeachKey(handler
func(key, val string)
return nil
}
-func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}
+func (a *ConsumerMessageExtractAdapter) Set(_, _ string) {}
// ConsumerMessageInjectAdapter Implements TextMap Interface
type ConsumerMessageInjectAdapter struct {
message pulsar.ConsumerMessage
}
-func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val
string) error) error {
+func (a *ConsumerMessageInjectAdapter) ForeachKey(_ func(_, _ string) error)
error {
return errors.New("iterator should never be used with tracer.inject()")
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index df78ae2f..90658c1a 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -112,7 +112,7 @@ func (msg *mockConsumerMessage) GetReplicatedFrom() string {
return ""
}
-func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error {
+func (msg *mockConsumerMessage) GetSchemaValue(_ interface{}) error {
return nil
}
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go
b/pulsar/internal/pulsartracing/producer_interceptor.go
index 6c7728cf..77cb8bf9 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -33,9 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer
pulsar.Producer, message *puls
buildAndInjectSpan(message, producer).Finish()
}
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
- message *pulsar.ProducerMessage,
- msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(_ pulsar.Producer,
+ _ *pulsar.ProducerMessage,
+ _ pulsar.MessageID) {
}
func buildAndInjectSpan(message *pulsar.ProducerMessage, producer
pulsar.Producer) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index 1c2c712f..74890f73 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -67,7 +67,7 @@ func (p *mockProducer) Flush() error {
return nil
}
-func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
+func (p *mockProducer) FlushWithCtx(_ context.Context) error {
return nil
}
diff --git a/pulsar/log/log.go b/pulsar/log/log.go
index 7ed52317..d68c687d 100644
--- a/pulsar/log/log.go
+++ b/pulsar/log/log.go
@@ -24,29 +24,29 @@ func DefaultNopLogger() Logger {
type nopLogger struct{}
-func (l nopLogger) SubLogger(fields Fields) Logger { return l }
-func (l nopLogger) WithFields(fields Fields) Entry { return
nopEntry{} }
-func (l nopLogger) WithField(name string, value interface{}) Entry { return
nopEntry{} }
-func (l nopLogger) WithError(err error) Entry { return
nopEntry{} }
-func (l nopLogger) Debug(args ...interface{}) {}
-func (l nopLogger) Info(args ...interface{}) {}
-func (l nopLogger) Warn(args ...interface{}) {}
-func (l nopLogger) Error(args ...interface{}) {}
-func (l nopLogger) Debugf(format string, args ...interface{}) {}
-func (l nopLogger) Infof(format string, args ...interface{}) {}
-func (l nopLogger) Warnf(format string, args ...interface{}) {}
-func (l nopLogger) Errorf(format string, args ...interface{}) {}
+func (l nopLogger) SubLogger(_ Fields) Logger { return l }
+func (l nopLogger) WithFields(_ Fields) Entry { return
nopEntry{} }
+func (l nopLogger) WithField(_ string, _ interface{}) Entry { return
nopEntry{} }
+func (l nopLogger) WithError(_ error) Entry { return
nopEntry{} }
+func (l nopLogger) Debug(_ ...interface{}) {}
+func (l nopLogger) Info(_ ...interface{}) {}
+func (l nopLogger) Warn(_ ...interface{}) {}
+func (l nopLogger) Error(_ ...interface{}) {}
+func (l nopLogger) Debugf(_ string, _ ...interface{}) {}
+func (l nopLogger) Infof(_ string, _ ...interface{}) {}
+func (l nopLogger) Warnf(_ string, _ ...interface{}) {}
+func (l nopLogger) Errorf(_ string, _ ...interface{}) {}
type nopEntry struct{}
-func (e nopEntry) WithFields(fields Fields) Entry { return
nopEntry{} }
-func (e nopEntry) WithField(name string, value interface{}) Entry { return
nopEntry{} }
+func (e nopEntry) WithFields(_ Fields) Entry { return nopEntry{}
}
+func (e nopEntry) WithField(_ string, _ interface{}) Entry { return nopEntry{}
}
-func (e nopEntry) Debug(args ...interface{}) {}
-func (e nopEntry) Info(args ...interface{}) {}
-func (e nopEntry) Warn(args ...interface{}) {}
-func (e nopEntry) Error(args ...interface{}) {}
-func (e nopEntry) Debugf(format string, args ...interface{}) {}
-func (e nopEntry) Infof(format string, args ...interface{}) {}
-func (e nopEntry) Warnf(format string, args ...interface{}) {}
-func (e nopEntry) Errorf(format string, args ...interface{}) {}
+func (e nopEntry) Debug(_ ...interface{}) {}
+func (e nopEntry) Info(_ ...interface{}) {}
+func (e nopEntry) Warn(_ ...interface{}) {}
+func (e nopEntry) Error(_ ...interface{}) {}
+func (e nopEntry) Debugf(_ string, _ ...interface{}) {}
+func (e nopEntry) Infof(_ string, _ ...interface{}) {}
+func (e nopEntry) Warnf(_ string, _ ...interface{}) {}
+func (e nopEntry) Errorf(_ string, _ ...interface{}) {}
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 58f56767..1331c7df 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -25,7 +25,7 @@ import (
)
type redeliveryConsumer interface {
- Redeliver(msgIds []messageID)
+ Redeliver(msgIDs []messageID)
}
type negativeAcksTracker struct {
@@ -123,7 +123,7 @@ func (t *negativeAcksTracker) track() {
case <-t.tick.C:
{
now := time.Now()
- msgIds := make([]messageID, 0)
+ msgIDs := make([]messageID, 0)
t.Lock()
@@ -131,15 +131,15 @@ func (t *negativeAcksTracker) track() {
t.log.Debugf("MsgId: %v -- targetTime:
%v -- now: %v", msgID, targetTime, now)
if targetTime.Before(now) {
t.log.Debugf("Adding MsgId:
%v", msgID)
- msgIds = append(msgIds, msgID)
+ msgIDs = append(msgIDs, msgID)
delete(t.negativeAcks, msgID)
}
}
t.Unlock()
- if len(msgIds) > 0 {
- t.rc.Redeliver(msgIds)
+ if len(msgIDs) > 0 {
+ t.rc.Redeliver(msgIDs)
}
}
}
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index 94934123..3f03ac44 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -56,22 +56,22 @@ func newNackMockedConsumer(nackBackoffPolicy
NackBackoffPolicy) *nackMockedConsu
return t
}
-func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+func (nmc *nackMockedConsumer) Redeliver(msgIDs []messageID) {
nmc.lock.Lock()
defer nmc.lock.Unlock()
if nmc.closed {
return
}
- for _, id := range msgIds {
+ for _, id := range msgIDs {
nmc.ch <- id
}
}
-func sortMessageIds(msgIds []messageID) []messageID {
- sort.Slice(msgIds, func(i, j int) bool {
- return msgIds[i].ledgerID < msgIds[j].entryID
+func sortMessageIDs(msgIDs []messageID) []messageID {
+ sort.Slice(msgIDs, func(i, j int) bool {
+ return msgIDs[i].ledgerID < msgIDs[j].entryID
})
- return msgIds
+ return msgIDs
}
func (nmc *nackMockedConsumer) Wait() <-chan messageID {
@@ -94,17 +94,17 @@ func TestNacksTracker(t *testing.T) {
batchIdx: 1,
})
- msgIds := make([]messageID, 0)
+ msgIDs := make([]messageID, 0)
for id := range nmc.Wait() {
- msgIds = append(msgIds, id)
+ msgIDs = append(msgIDs, id)
}
- msgIds = sortMessageIds(msgIds)
+ msgIDs = sortMessageIDs(msgIDs)
- assert.Equal(t, 2, len(msgIds))
- assert.Equal(t, int64(1), msgIds[0].ledgerID)
- assert.Equal(t, int64(1), msgIds[0].entryID)
- assert.Equal(t, int64(2), msgIds[1].ledgerID)
- assert.Equal(t, int64(2), msgIds[1].entryID)
+ assert.Equal(t, 2, len(msgIDs))
+ assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+ assert.Equal(t, int64(1), msgIDs[0].entryID)
+ assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+ assert.Equal(t, int64(2), msgIDs[1].entryID)
nacks.Close()
// allow multiple Close without panicing
@@ -139,17 +139,17 @@ func TestNacksWithBatchesTracker(t *testing.T) {
batchIdx: 1,
})
- msgIds := make([]messageID, 0)
+ msgIDs := make([]messageID, 0)
for id := range nmc.Wait() {
- msgIds = append(msgIds, id)
+ msgIDs = append(msgIDs, id)
}
- msgIds = sortMessageIds(msgIds)
+ msgIDs = sortMessageIDs(msgIDs)
- assert.Equal(t, 2, len(msgIds))
- assert.Equal(t, int64(1), msgIds[0].ledgerID)
- assert.Equal(t, int64(1), msgIds[0].entryID)
- assert.Equal(t, int64(2), msgIds[1].ledgerID)
- assert.Equal(t, int64(2), msgIds[1].entryID)
+ assert.Equal(t, 2, len(msgIDs))
+ assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+ assert.Equal(t, int64(1), msgIDs[0].entryID)
+ assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+ assert.Equal(t, int64(2), msgIDs[1].entryID)
nacks.Close()
}
@@ -161,17 +161,17 @@ func TestNackBackoffTracker(t *testing.T) {
nacks.AddMessage(new(mockMessage1))
nacks.AddMessage(new(mockMessage2))
- msgIds := make([]messageID, 0)
+ msgIDs := make([]messageID, 0)
for id := range nmc.Wait() {
- msgIds = append(msgIds, id)
+ msgIDs = append(msgIDs, id)
}
- msgIds = sortMessageIds(msgIds)
+ msgIDs = sortMessageIDs(msgIDs)
- assert.Equal(t, 2, len(msgIds))
- assert.Equal(t, int64(1), msgIds[0].ledgerID)
- assert.Equal(t, int64(1), msgIds[0].entryID)
- assert.Equal(t, int64(2), msgIds[1].ledgerID)
- assert.Equal(t, int64(2), msgIds[1].entryID)
+ assert.Equal(t, 2, len(msgIDs))
+ assert.Equal(t, int64(1), msgIDs[0].ledgerID)
+ assert.Equal(t, int64(1), msgIDs[0].entryID)
+ assert.Equal(t, int64(2), msgIDs[1].ledgerID)
+ assert.Equal(t, int64(2), msgIDs[1].entryID)
nacks.Close()
// allow multiple Close without panicing
@@ -230,7 +230,7 @@ func (msg *mockMessage1) GetReplicatedFrom() string {
return ""
}
-func (msg *mockMessage1) GetSchemaValue(v interface{}) error {
+func (msg *mockMessage1) GetSchemaValue(_ interface{}) error {
return nil
}
@@ -306,7 +306,7 @@ func (msg *mockMessage2) GetReplicatedFrom() string {
return ""
}
-func (msg *mockMessage2) GetSchemaValue(v interface{}) error {
+func (msg *mockMessage2) GetSchemaValue(_ interface{}) error {
return nil
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f578ee4b..b1fc3f02 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1093,7 +1093,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg
*ProducerMessage) (Mes
isDone := uAtomic.NewBool(false)
doneCh := make(chan struct{})
- p.internalSendAsync(ctx, msg, func(ID MessageID, message
*ProducerMessage, e error) {
+ p.internalSendAsync(ctx, msg, func(ID MessageID, _ *ProducerMessage, e
error) {
if isDone.CAS(false, true) {
err = e
msgID = ID
@@ -1375,59 +1375,58 @@ func (p *partitionProducer)
ReceivedSendReceipt(response *pb.CommandSendReceipt)
p.log.Warnf("Received ack for %v on sequenceId %v - expected:
%v, local > remote, ignore it",
response.GetMessageId(), response.GetSequenceId(),
pi.sequenceID)
return
- } else {
- // The ack was indeed for the expected item in the queue, we
can remove it and trigger the callback
- p.pendingQueue.Poll()
+ }
+ // The ack was indeed for the expected item in the queue, we can remove
it and trigger the callback
+ p.pendingQueue.Poll()
- now := time.Now().UnixNano()
+ now := time.Now().UnixNano()
- // lock the pending item while sending the requests
- pi.Lock()
- defer pi.Unlock()
-
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
- batchSize := int32(len(pi.sendRequests))
- for idx, i := range pi.sendRequests {
- sr := i.(*sendRequest)
- atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
-
- msgID := newMessageID(
- int64(response.MessageId.GetLedgerId()),
- int64(response.MessageId.GetEntryId()),
- int32(idx),
- p.partitionIdx,
- batchSize,
- )
-
- if sr.totalChunks > 1 {
- if sr.chunkID == 0 {
- sr.chunkRecorder.setFirstChunkID(
- &messageID{
-
int64(response.MessageId.GetLedgerId()),
-
int64(response.MessageId.GetEntryId()),
- -1,
- p.partitionIdx,
- 0,
- })
- } else if sr.chunkID == sr.totalChunks-1 {
- sr.chunkRecorder.setLastChunkID(
- &messageID{
-
int64(response.MessageId.GetLedgerId()),
-
int64(response.MessageId.GetEntryId()),
- -1,
- p.partitionIdx,
- 0,
- })
- // use chunkMsgID to set msgID
- msgID = &sr.chunkRecorder.chunkedMsgID
- }
+ // lock the pending item while sending the requests
+ pi.Lock()
+ defer pi.Unlock()
+ p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) /
1.0e9)
+ batchSize := int32(len(pi.sendRequests))
+ for idx, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
+
+ msgID := newMessageID(
+ int64(response.MessageId.GetLedgerId()),
+ int64(response.MessageId.GetEntryId()),
+ int32(idx),
+ p.partitionIdx,
+ batchSize,
+ )
+
+ if sr.totalChunks > 1 {
+ if sr.chunkID == 0 {
+ sr.chunkRecorder.setFirstChunkID(
+ &messageID{
+
int64(response.MessageId.GetLedgerId()),
+
int64(response.MessageId.GetEntryId()),
+ -1,
+ p.partitionIdx,
+ 0,
+ })
+ } else if sr.chunkID == sr.totalChunks-1 {
+ sr.chunkRecorder.setLastChunkID(
+ &messageID{
+
int64(response.MessageId.GetLedgerId()),
+
int64(response.MessageId.GetEntryId()),
+ -1,
+ p.partitionIdx,
+ 0,
+ })
+ // use chunkMsgID to set msgID
+ msgID = &sr.chunkRecorder.chunkedMsgID
}
-
- sr.done(msgID, nil)
}
- // Mark this pending item as done
- pi.done(nil)
+ sr.done(msgID, nil)
}
+
+ // Mark this pending item as done
+ pi.done(nil)
}
func (p *partitionProducer) internalClose(req *closeProducer) {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index afd1f09a..b58c0608 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -153,7 +153,7 @@ func TestProducerAsyncSend(t *testing.T) {
for i := 0; i < 10; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
- }, func(id MessageID, message *ProducerMessage, e error) {
+ }, func(id MessageID, _ *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
@@ -174,7 +174,7 @@ func TestProducerAsyncSend(t *testing.T) {
assert.Equal(t, 0, errors.Size())
wg.Add(1)
- producer.SendAsync(context.Background(), nil, func(id MessageID, m
*ProducerMessage, e error) {
+ producer.SendAsync(context.Background(), nil, func(id MessageID, _
*ProducerMessage, e error) {
assert.NotNil(t, e)
assert.Nil(t, id)
wg.Done()
@@ -183,7 +183,7 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Add(1)
producer.SendAsync(context.Background(), &ProducerMessage{Payload:
[]byte("hello"), Value: []byte("hello")},
- func(id MessageID, m *ProducerMessage, e error) {
+ func(id MessageID, _ *ProducerMessage, e error) {
assert.NotNil(t, e)
assert.Nil(t, id)
wg.Done()
@@ -214,7 +214,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
for i := 0; i < 10; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
- }, func(id MessageID, message *ProducerMessage, e error) {
+ }, func(id MessageID, _ *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
@@ -383,7 +383,7 @@ func TestFlushInProducer(t *testing.T) {
messageContent := prefix + fmt.Sprintf("%d", i)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(messageContent),
- }, func(id MessageID, producerMessage *ProducerMessage, e
error) {
+ }, func(id MessageID, _ *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
@@ -424,7 +424,7 @@ func TestFlushInProducer(t *testing.T) {
messageContent := prefix + fmt.Sprintf("%d", i)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(messageContent),
- }, func(id MessageID, producerMessage *ProducerMessage, e
error) {
+ }, func(id MessageID, _ *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
@@ -494,7 +494,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
messageContent := prefix + fmt.Sprintf("%d", i)
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(messageContent),
- }, func(id MessageID, producerMessage *ProducerMessage, e
error) {
+ }, func(id MessageID, _ *ProducerMessage, e error) {
if e != nil {
log.WithError(e).Error("Failed to publish")
errors.Put(e)
@@ -844,7 +844,7 @@ func TestBatchMessageFlushing(t *testing.T) {
msg := &ProducerMessage{
Payload: msg,
}
- producer.SendAsync(ctx, msg, func(id MessageID, producerMessage
*ProducerMessage, err error) {
+ producer.SendAsync(ctx, msg, func(_ MessageID, _
*ProducerMessage, _ error) {
ch <- struct{}{}
})
}
@@ -898,7 +898,7 @@ func TestBatchDelayMessage(t *testing.T) {
}
var delayMsgID int64
ch := make(chan struct{}, 2)
- producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage
*ProducerMessage, err error) {
+ producer.SendAsync(ctx, delayMsg, func(id MessageID, _
*ProducerMessage, _ error) {
atomic.StoreInt64(&delayMsgID, id.(*messageID).entryID)
ch <- struct{}{}
})
@@ -914,7 +914,7 @@ func TestBatchDelayMessage(t *testing.T) {
Payload: []byte("no delay"),
}
var noDelayMsgID int64
- producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage
*ProducerMessage, err error) {
+ producer.SendAsync(ctx, noDelayMsg, func(id MessageID, _
*ProducerMessage, _ error) {
atomic.StoreInt64(&noDelayMsgID, id.(*messageID).entryID)
})
for i := 0; i < 2; i++ {
@@ -1174,7 +1174,7 @@ func TestFailedSchemaEncode(t *testing.T) {
// producer should send return an error as message is Int64, but schema
is String
producer.SendAsync(ctx, &ProducerMessage{
Value: int64(1),
- }, func(messageID MessageID, producerMessage *ProducerMessage, err
error) {
+ }, func(messageID MessageID, _ *ProducerMessage, err error) {
assert.NotNil(t, err)
assert.Nil(t, messageID)
wg.Done()
@@ -1503,9 +1503,9 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) {
type noopProduceInterceptor struct{}
-func (noopProduceInterceptor) BeforeSend(producer Producer, message
*ProducerMessage) {}
+func (noopProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) {}
-func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message
*ProducerMessage, msgID MessageID) {
+func (noopProduceInterceptor) OnSendAcknowledgement(_ Producer, _
*ProducerMessage, _ MessageID) {
}
// copyPropertyIntercepotr copy all keys in message properties map and add a
suffix
@@ -1514,11 +1514,11 @@ type metricProduceInterceptor struct {
ackn int
}
-func (x *metricProduceInterceptor) BeforeSend(producer Producer, message
*ProducerMessage) {
+func (x *metricProduceInterceptor) BeforeSend(_ Producer, _ *ProducerMessage) {
x.sendn++
}
-func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer,
message *ProducerMessage, msgID MessageID) {
+func (x *metricProduceInterceptor) OnSendAcknowledgement(_ Producer, _
*ProducerMessage, _ MessageID) {
x.ackn++
}
@@ -1734,7 +1734,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t
*testing.T) {
Payload: messageContent,
Key: key,
Schema: schema,
- }, func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ }, func(id MessageID, _ *ProducerMessage, err error) {
assert.NoError(t, err)
assert.NotNil(t, id)
})
@@ -1827,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
Payload: messageContent,
Key: key,
Schema: schema,
- }, func(id MessageID, producerMessage *ProducerMessage, err
error) {
+ }, func(id MessageID, _ *ProducerMessage, err error) {
assert.NoError(t, err)
assert.NotNil(t, id)
})
@@ -2025,16 +2025,16 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
for i := 0; i < n/2; i++ {
producer1.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
producer2.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
}
// Last message in order to reach the limit
producer1.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage())
@@ -2112,18 +2112,18 @@ func TestMemLimitRejectProducerMessagesWithSchema(t
*testing.T) {
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
producer2.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
}
// Last message in order to reach the limit
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())
@@ -2187,7 +2187,7 @@ func TestMemLimitRejectProducerMessagesWithChunking(t
*testing.T) {
producer2.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 5*1024+1),
- }, func(id MessageID, message *ProducerMessage, e error) {
+ }, func(_ MessageID, _ *ProducerMessage, e error) {
if e != nil {
t.Fatal(e)
}
@@ -2247,7 +2247,7 @@ func TestMemLimitContextCancel(t *testing.T) {
for i := 0; i < n; i++ {
producer.SendAsync(ctx, &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {})
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {})
}
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(n*1024), c.(*client).memLimit.CurrentUsage())
@@ -2257,7 +2257,7 @@ func TestMemLimitContextCancel(t *testing.T) {
go func() {
producer.SendAsync(ctx, &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {
+ }, func(_ MessageID, _ *ProducerMessage, e error) {
assert.Error(t, e)
assert.ErrorContains(t, e, getResultStr(TimeoutError))
wg.Done()
@@ -2369,7 +2369,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
for i := 0; i < 3; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
- }, func(id MessageID, message *ProducerMessage, e error) {
+ }, func(_ MessageID, _ *ProducerMessage, e error) {
if e != nil {
assert.True(t, errors.Is(e, ErrProducerClosed))
}
@@ -2517,7 +2517,7 @@ func TestProducerWithMaxConnectionsPerBroker(t
*testing.T) {
var ok int32
testProducer.SendAsync(context.Background(),
&ProducerMessage{Value: []byte("hello")},
- func(id MessageID, producerMessage *ProducerMessage,
err error) {
+ func(_ MessageID, _ *ProducerMessage, err error) {
if err == nil {
atomic.StoreInt32(&ok, 1)
}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 3c928c1d..d2fa5597 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -320,7 +320,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage, err
error) {
+ }, func(id MessageID, _ *ProducerMessage, err error) {
assert.NoError(t, err)
assert.NotNil(t, id)
msgIDs[idx] = id
@@ -396,7 +396,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
producer.SendAsync(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
- }, func(id MessageID, producerMessage *ProducerMessage, err
error) {
+ }, func(id MessageID, _ *ProducerMessage, err error) {
assert.NoError(t, err)
assert.NotNil(t, id)
msgIDs[idx] = id
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index c8aa0b94..a9a67adb 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -98,8 +98,8 @@ func (r *retryRouter) run() {
producer := r.getProducer()
msgID := rm.consumerMsg.ID()
- producer.SendAsync(context.Background(),
&rm.producerMsg, func(messageID MessageID,
- producerMessage *ProducerMessage, err error) {
+ producer.SendAsync(context.Background(),
&rm.producerMsg, func(_ MessageID,
+ _ *ProducerMessage, err error) {
if err != nil {
r.log.WithError(err).WithField("msgID",
msgID).Error("Failed to send message to RLQ")
rm.consumerMsg.Consumer.Nack(rm.consumerMsg)
@@ -150,9 +150,8 @@ func (r *retryRouter) getProducer() Producer {
r.log.WithError(err).Error("Failed to create RLQ
producer")
time.Sleep(bo.Next())
continue
- } else {
- r.producer = producer
- return producer
}
+ r.producer = producer
+ return producer
}
}
diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go
index 2368e3d8..f9f8aec1 100644
--- a/pulsar/table_view_test.go
+++ b/pulsar/table_view_test.go
@@ -55,7 +55,7 @@ func TestTableView(t *testing.T) {
t.Log(key)
_, err = producer.Send(context.Background(), &ProducerMessage{
Key: key,
- Value: fmt.Sprintf(valuePrefix + key),
+ Value: valuePrefix + key,
})
assert.NoError(t, err)
}
@@ -329,7 +329,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) {
t.Log("foreach" + key)
s, ok := value.(testJSON)
assert.Truef(t, ok, "expected value to be testJSON type got
%T", value)
- assert.Equal(t, fmt.Sprintf(valuePrefix+key), s.Name)
+ assert.Equal(t, valuePrefix+key, s.Name)
return nil
})
@@ -349,7 +349,7 @@ func TestForEachAndListenJSONSchema(t *testing.T) {
Key: key,
Value: testJSON{
ID: i,
- Name: fmt.Sprintf(valuePrefix + key),
+ Name: valuePrefix + key,
},
})
assert.NoError(t, err)
diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go
index 2eb8aca9..82e6fcfa 100644
--- a/pulsar/transaction_impl.go
+++ b/pulsar/transaction_impl.go
@@ -84,7 +84,7 @@ func (txn *transaction) GetState() TxnState {
return txn.state
}
-func (txn *transaction) Commit(ctx context.Context) error {
+func (txn *transaction) Commit(_ context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnCommitting)) ||
txn.state == TxnCommitting) {
return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
@@ -110,7 +110,7 @@ func (txn *transaction) Commit(ctx context.Context) error {
return err
}
-func (txn *transaction) Abort(ctx context.Context) error {
+func (txn *transaction) Abort(_ context.Context) error {
if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen),
int32(TxnAborting)) ||
txn.state == TxnAborting) {
return newError(InvalidStatus, "Expect transaction state is
TxnOpen but "+txn.state.string())
diff --git a/pulsaradmin/pkg/admin/admin_test.go
b/pulsaradmin/pkg/admin/admin_test.go
index c4fa5295..f6cd3263 100644
--- a/pulsaradmin/pkg/admin/admin_test.go
+++ b/pulsaradmin/pkg/admin/admin_test.go
@@ -61,7 +61,7 @@ type customAuthProvider struct {
var _ auth.Provider = &customAuthProvider{}
-func (c *customAuthProvider) RoundTrip(req *http.Request) (*http.Response,
error) {
+func (c *customAuthProvider) RoundTrip(_ *http.Request) (*http.Response,
error) {
panic("implement me")
}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2.go
b/pulsaradmin/pkg/admin/auth/oauth2.go
index 4bd15461..59587abf 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2.go
@@ -121,7 +121,7 @@ func NewAuthenticationOAuth2WithParams(
issuerEndpoint,
clientID,
audience string,
- scope string,
+ _ string,
transport http.RoundTripper) (*OAuth2Provider, error) {
issuer := oauth2.Issuer{
@@ -257,6 +257,6 @@ func makeKeyring() (keyring.Keyring, error) {
})
}
-func keyringPrompt(prompt string) (string, error) {
+func keyringPrompt(_ string) (string, error) {
return "", nil
}
diff --git a/pulsaradmin/pkg/admin/auth/oauth2_test.go
b/pulsaradmin/pkg/admin/auth/oauth2_test.go
index b25e5761..b19133c7 100644
--- a/pulsaradmin/pkg/admin/auth/oauth2_test.go
+++ b/pulsaradmin/pkg/admin/auth/oauth2_test.go
@@ -37,7 +37,7 @@ func mockOAuthServer() *httptest.Server {
// mock the used REST path for the tests
mockedHandler := http.NewServeMux()
- mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/.well-known/openid-configuration",
func(writer http.ResponseWriter, _ *http.Request) {
s := fmt.Sprintf(`{
"issuer":"%s",
"authorization_endpoint":"%s/authorize",
@@ -46,10 +46,10 @@ func mockOAuthServer() *httptest.Server {
}`, server.URL, server.URL, server.URL, server.URL)
fmt.Fprintln(writer, s)
})
- mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, request *http.Request) {
+ mockedHandler.HandleFunc("/oauth/token", func(writer
http.ResponseWriter, _ *http.Request) {
fmt.Fprintln(writer, "{\n \"access_token\":
\"token-content\",\n \"token_type\": \"Bearer\"\n}")
})
- mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
request *http.Request) {
+ mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter,
_ *http.Request) {
fmt.Fprintln(writer, "true")
})
diff --git a/pulsaradmin/pkg/admin/namespace.go
b/pulsaradmin/pkg/admin/namespace.go
index 9c3f3e3f..e92f020c 100644
--- a/pulsaradmin/pkg/admin/namespace.go
+++ b/pulsaradmin/pkg/admin/namespace.go
@@ -148,7 +148,7 @@ type Namespaces interface {
GetNamespaceReplicationClusters(namespace string) ([]string, error)
// SetNamespaceReplicationClusters returns the replication clusters for
a namespace
- SetNamespaceReplicationClusters(namespace string, clusterIds []string)
error
+ SetNamespaceReplicationClusters(namespace string, clusterIDs []string)
error
// SetNamespaceAntiAffinityGroup sets anti-affinity group name for a
namespace
SetNamespaceAntiAffinityGroup(namespace string,
namespaceAntiAffinityGroup string) error
@@ -616,13 +616,13 @@ func (n *namespaces)
GetNamespaceReplicationClusters(namespace string) ([]string
return data, err
}
-func (n *namespaces) SetNamespaceReplicationClusters(namespace string,
clusterIds []string) error {
+func (n *namespaces) SetNamespaceReplicationClusters(namespace string,
clusterIDs []string) error {
nsName, err := utils.GetNamespaceName(namespace)
if err != nil {
return err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(),
"replication")
- return n.pulsar.Client.Post(endpoint, &clusterIds)
+ return n.pulsar.Client.Post(endpoint, &clusterIDs)
}
func (n *namespaces) SetNamespaceAntiAffinityGroup(namespace string,
namespaceAntiAffinityGroup string) error {
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index 98db6961..087a4a37 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -67,7 +67,7 @@ func TestGetMessagesByID(t *testing.T) {
for i := 0; i <= numberMessages; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
- }, func(id pulsar.MessageID, message *pulsar.ProducerMessage,
err error) {
+ }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err
error) {
assert.Nil(t, err)
messageIDMap[id.String()]++
wg.Done()
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 61607912..c9664e71 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -215,7 +215,7 @@ type NamespacesData struct {
MessageTTL int `json:"messageTTL"`
BookkeeperAckQuorum int `json:"bookkeeperAckQuorum"`
ManagedLedgerMaxMarkDeleteRate float64
`json:"managedLedgerMaxMarkDeleteRate"`
- ClusterIds string `json:"clusterIds"`
+ ClusterIDs string `json:"clusterIds"`
RetentionTimeStr string `json:"retentionTimeStr"`
LimitStr string `json:"limitStr"`
LimitTime int64 `json:"limitTime"`
diff --git a/pulsaradmin/pkg/utils/schema_util.go
b/pulsaradmin/pkg/utils/schema_util.go
index 671627fa..b8a17d96 100644
--- a/pulsaradmin/pkg/utils/schema_util.go
+++ b/pulsaradmin/pkg/utils/schema_util.go
@@ -56,11 +56,9 @@ type IsCompatibility struct {
func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response
GetSchemaResponse) *SchemaInfo {
info := new(SchemaInfo)
schema := make([]byte, 0, 10)
- if response.Type == "KEY_VALUE" {
- // TODO: impl logic
- } else {
+ if response.Type != "KEY_VALUE" {
schema = []byte(response.Data)
- }
+ } // TODO: impl logic for KEY_VALUE
info.Schema = schema
info.Type = response.Type