TheDungNTU opened a new issue, #20731: URL: https://github.com/apache/pulsar/issues/20731
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version - OS: Distributor ID: Ubuntu Description: Ubuntu 20.04.5 LTS Release: 20.04 Codename: focal - Flink: 1.12.2 - Pulsar: 2.10.2 - Library used: org.apache.pulsar:pulsar-flink:2.6.4 ### Minimal reproduce step **Describe the bug** We are trying to start a Flink Stream using the Flink-connector, but there seems to be an issue with the CachedPulsarClient (AlreadyClosedException on the Pulsar client of the FlinkPulsarSink). **To Reproduce** The Flink Stream have 2 sources (2 FlinkPulsarSource) and a FlinkPulsarSink as output. Both FlinkPulsarSource instanciate a PulsarClient in the CachedPulsarClient (one per source, no matter the parallelism value), the FlinkPulsarSink instanciates one PulsarClient per slot used (3 instances with a parallelism set to 3). The CachedPulsarClient have a maximum cache size of 5 by default, so until a parallelism of 3 there is no issue (1 client for each source, and 3 clients for the sink), but when the parallelism is set to a greater value than the size of the cache, we have this exception : Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = Closing at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:870) at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.createProducer(FlinkPulsarSinkBase.java:264) The client appears to be closed while still in use. We tried to change the clientcachesize parameter to increase the number of clients stored in the cache, but the parameter doesn't seem to work (only 5 clients seem to be stored while debugging), just a guess : the setCacheSize method on the CachedPulsarClient set a static value but as the guavaCache is also a static value of the same class, it's already set before the cacheSize could be changed. ### What did you expect to see? How can I change this parameter clientcachesize    ### What did you see instead? ***Please help me*** ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
