loserwang1024 opened a new issue, #1730: URL: https://github.com/apache/fluss/issues/1730
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.7.0 (latest release) ### Please describe the bug 🐞 I found there is a strange situation: ```java 2025-09-20 13:04:40,706 [fluss-netty-client(EPOLL)-1-2] ERROR org.apache.fluss.rpc.netty.client.ServerConnection [] - Failed to initialize authenticator java.lang.NullPointerException: null at org.apache.fluss.rpc.netty.client.ServerConnection.sendInitialToken(ServerConnection.java:392) ~[fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.rpc.netty.client.ServerConnection.handleApiVersionsResponse(ServerConnection.java:386) ~[fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) [?:?] at org.apache.fluss.rpc.netty.client.ServerConnection$ResponseCallback.onRequestResult(ServerConnection.java:242) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.rpc.netty.client.NettyClientHandler.channelRead(NettyClientHandler.java:119) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at org.apache.fluss.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [fluss-ali-vvr-8-shuffle.jar:0.8-SNAPSHOT] at java.lang.Thread.run(Thread.java:991) [?:?] 2025-09-20 13:04:40,718 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Get error write response on table bucket TableBucket{tableId=23, partitionId=5188, bucket=30}, retrying (2147483647 attempts left). Error: NETWORK_EXCEPTION. Error Message: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:40,719 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Received invalid metadata error in write request on bucket TableBucket{tableId=23, partitionId=5188, bucket=30}. Going to request metadata update. org.apache.fluss.exception.NetworkException: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:40,730 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:40,731 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:40,733 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:40,761 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:40,781 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,163 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,163 [sink-dwd_app_taobao_page_fi2-datalake_dt_rtcdm: Writer (444/512)#0] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,165 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,251 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Get error write response on table bucket TableBucket{tableId=23, partitionId=5188, bucket=30}, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:41,251 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Received invalid metadata error in write request on bucket TableBucket{tableId=23, partitionId=5188, bucket=30}. Going to request metadata update. org.apache.fluss.exception.NetworkException: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:41,252 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,273 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,288 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,383 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,384 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,384 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,385 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,640 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,651 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,663 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Get error write response on table bucket TableBucket{tableId=23, partitionId=5188, bucket=30}, retrying (2147483645 attempts left). Error: NETWORK_EXCEPTION. Error Message: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:41,664 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Received invalid metadata error in write request on bucket TableBucket{tableId=23, partitionId=5188, bucket=30}. Going to request metadata update. org.apache.fluss.exception.NetworkException: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:41,666 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,667 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,702 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,801 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,802 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,979 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,980 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,981 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,981 [fluss-write-sender-thread-1] INFO org.apache.fluss.security.auth.ram.RamClientAuthenticator [] - RamClientAuthenticator created with ramCredential: RamCredential{accessId='LTAI5t6WrQgrLgAKsDfHXVho', accessKey='******', actUserId='null'} 2025-09-20 13:04:41,982 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Get error write response on table bucket TableBucket{tableId=23, partitionId=5188, bucket=30}, retrying (2147483644 attempts left). Error: NETWORK_EXCEPTION. Error Message: org.apache.fluss.exception.DisconnectException: Cannot send request to server 33.119.73.64:9123 (id: ts-81, rack: 33.93.119.46) because it is disconnected. 2025-09-20 13:04:41,982 [fluss-write-sender-thread-1] WARN org.apache.fluss.client.write.Sender [] - Received invalid metadata error in write request on bucket TableBucket{tableId=23, partitionId=5188, bucket=30}. Going to request metadata update. ``` When met a NullPointerException, the connection will be closed. However, the retried send requests send request will be forever fails. Thus, I want to find two point: 1. why NullPointerException occurs? 2. Even when NullPointerException occurs, why the retried send requests will be forever fails? Our design is that even some problems occurs, the client will recreated the connection and retry. ### Solution ## why NullPointerException occurs? Same as https://github.com/apache/fluss/pull/781, we need put `bootstrap.connect(node.host(), node.port()).addListener(future -> establishConnection((ChannelFuture) future, isInnerClient));` at the final line. otherwise ,when connection is established, the authenticator is still not be assigned value. ## why the retried send requests will be forever fails When problem occurs in constructor of serverConnection, we are not set whenClose. Thus connection is not be removed, and be reused later. ```java private ServerConnection getOrCreateConnection(ServerNode node) { String serverId = node.uid(); return connections.computeIfAbsent( serverId, ignored -> { LOG.debug("Creating connection to server {}.", node); ServerConnection connection = new ServerConnection( bootstrap, node, clientMetricGroup, authenticatorSupplier.get(), isInnerClient); connection.whenClose(ignore -> connections.remove(serverId, connection)); return connection; }); } ``` ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
