This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bfd32  fix: add service not ready check (#757)
64bfd32 is described below

commit 64bfd327148c0b771f4ea462859808a961686ff4
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Apr 12 22:47:08 2022 +0800

    fix: add service not ready check (#757)
    
    ### Motivation
    
    We are using this client with AuthenticationOAuth2 to connect the Pulsar 
proxy.
    
    If the OAuth2 token is expired, it still works, but if I then restarted the 
broker service, the client fails to reconnect, you can see these log on the 
client:
    
    ```
    time="2022-03-25T22:34:59Z" level=info msg="[Reconnecting to broker in  
1m4.607284318s]" producerID=853 producer_name=test-2-9-81-426 
topic="persistent://private/test/topic-52"
    time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]" 
error=ServiceNotReady message="Disconnected from server at 
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650" 
serviceURL="pulsar+ssl://test-2-9.cv-pulsar.sn3.dev:6651" 
topic="persistent://private/test/topic-52"
    time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]" 
error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276 
topic="persistent://private/test/topic-52"
    time="2022-03-25T22:35:03Z" level=error msg="[Failed to create producer at 
reconnect]" error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276 
topic="persistent://private/test/topic-52"
    ```
    
    
    These errors and warnings on the proxy:
    ```
    2022-03-25T22:36:22,539+0000 [pulsar-proxy-io-2-2] INFO  
org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x5cf3d820, 
L:/10.24.10.175:44760 - 
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650]] Connected to 
server
    2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, 
L:/10.24.10.175:44760 - 
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received error 
from server: Unable to authenticate
    2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] ERROR 
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, 
L:/10.24.10.175:44760 - 
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Failed to 
authenticate the client
    2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, 
L:/10.24.10.175:44760 - 
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received 
unknown request id from server: -1
    2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] INFO  
org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, 
L:/10.24.10.175:44760 ! 
R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Disconnected
    2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN  
org.apache.pulsar.proxy.server.LookupProxyHandler - 
[persistent://private/test/topic-13] failed to get Partitioned metadata : 
Disconnected from server at 
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
    org.apache.pulsar.client.api.PulsarClientException$ConnectException: 
Disconnected from server at 
test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
            at 
org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:266) 
[io.streamnative-pulsar-client-original-2.9.2.9.jar:2.9.2.9]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392)
 [io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357)
 [io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
 [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) 
[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) 
[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    
    ```
    
    In this case, we need to close this connection to re-authentication.
    
    ### Modifications
    
    Check the `ServerError` error from the cmd response, if this error equals 
the`ServerError_ServiceNotReady`, we need to disconnect the proxy.
---
 pulsar/internal/connection.go | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a025abf..6055252 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -522,10 +522,12 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
                c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)
 
        case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
+               c.checkServerError(cmd.PartitionMetadataResponse.Error)
                c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), 
cmd)
 
        case pb.BaseCommand_LOOKUP_RESPONSE:
                lookupResult := cmd.LookupTopicResponse
+               c.checkServerError(lookupResult.Error)
                c.handleResponse(lookupResult.GetRequestId(), cmd)
 
        case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
@@ -574,6 +576,16 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
        }
 }
 
+func (c *connection) checkServerError(err *pb.ServerError) {
+       if err == nil {
+               return
+       }
+
+       if *err == pb.ServerError_ServiceNotReady {
+               c.Close()
+       }
+}
+
 func (c *connection) Write(data Buffer) {
        c.writeRequestsCh <- data
 }

Reply via email to