This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 64bfd32 fix: add service not ready check (#757)
64bfd32 is described below
commit 64bfd327148c0b771f4ea462859808a961686ff4
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Apr 12 22:47:08 2022 +0800
fix: add service not ready check (#757)
### Motivation
We are using this client with AuthenticationOAuth2 to connect the Pulsar
proxy.
If the OAuth2 token is expired, it still works, but if I then restarted the
broker service, the client fails to reconnect, you can see these log on the
client:
```
time="2022-03-25T22:34:59Z" level=info msg="[Reconnecting to broker in
1m4.607284318s]" producerID=853 producer_name=test-2-9-81-426
topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]"
error=ServiceNotReady message="Disconnected from server at
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650"
serviceURL="pulsar+ssl://test-2-9.cv-pulsar.sn3.dev:6651"
topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]"
error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276
topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=error msg="[Failed to create producer at
reconnect]" error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276
topic="persistent://private/test/topic-52"
```
These errors and warnings on the proxy:
```
2022-03-25T22:36:22,539+0000 [pulsar-proxy-io-2-2] INFO
org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x5cf3d820,
L:/10.24.10.175:44760 -
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650]] Connected to
server
2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] WARN
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820,
L:/10.24.10.175:44760 -
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received error
from server: Unable to authenticate
2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] ERROR
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820,
L:/10.24.10.175:44760 -
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Failed to
authenticate the client
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820,
L:/10.24.10.175:44760 -
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received
unknown request id from server: -1
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] INFO
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820,
L:/10.24.10.175:44760 !
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Disconnected
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN
org.apache.pulsar.proxy.server.LookupProxyHandler -
[persistent://private/test/topic-13] failed to get Partitioned metadata :
Disconnected from server at
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
org.apache.pulsar.client.api.PulsarClientException$ConnectException:
Disconnected from server at
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
at
org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:266)
[io.streamnative-pulsar-client-original-2.9.2.9.jar:2.9.2.9]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
In this case, we need to close this connection to re-authentication.
### Modifications
Check the `ServerError` error from the cmd response, if this error equals
the`ServerError_ServiceNotReady`, we need to disconnect the proxy.
---
pulsar/internal/connection.go | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a025abf..6055252 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -522,10 +522,12 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
+ c.checkServerError(cmd.PartitionMetadataResponse.Error)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(),
cmd)
case pb.BaseCommand_LOOKUP_RESPONSE:
lookupResult := cmd.LookupTopicResponse
+ c.checkServerError(lookupResult.Error)
c.handleResponse(lookupResult.GetRequestId(), cmd)
case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
@@ -574,6 +576,16 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
}
}
+func (c *connection) checkServerError(err *pb.ServerError) {
+ if err == nil {
+ return
+ }
+
+ if *err == pb.ServerError_ServiceNotReady {
+ c.Close()
+ }
+}
+
func (c *connection) Write(data Buffer) {
c.writeRequestsCh <- data
}