TheDungNTU opened a new issue, #20731:
URL: https://github.com/apache/pulsar/issues/20731

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   - OS: 
     Distributor ID:    Ubuntu
     Description:       Ubuntu 20.04.5 LTS
     Release:   20.04
     Codename:  focal
   - Flink: 1.12.2
   - Pulsar: 2.10.2
   - Library used: org.apache.pulsar:pulsar-flink:2.6.4
   
   ### Minimal reproduce step
   
   **Describe the bug**
   We are trying to start a Flink Stream using the Flink-connector, but there 
seems to be an issue with the CachedPulsarClient (AlreadyClosedException on the 
Pulsar client of the FlinkPulsarSink).
   
   **To Reproduce**
   The Flink Stream have 2 sources (2 FlinkPulsarSource) and a FlinkPulsarSink 
as output. Both FlinkPulsarSource instanciate a PulsarClient in the 
CachedPulsarClient (one per source, no matter the parallelism value), the 
FlinkPulsarSink instanciates one PulsarClient per slot used (3 instances with a 
parallelism set to 3). The CachedPulsarClient have a maximum cache size of 5 by 
default, so until a parallelism of 3 there is no issue (1 client for each 
source, and 3 clients for the sink), but when the parallelism is set to a 
greater value than the size of the cache, we have this exception :
   
   Caused by: 
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: 
Client already closed : state = 
   Closing
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:870)
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
        at 
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.createProducer(FlinkPulsarSinkBase.java:264)
   
   The client appears to be closed while still in use. We tried to change the 
clientcachesize parameter to increase the number of clients stored in the 
cache, but the parameter doesn't seem to work (only 5 clients seem to be stored 
while debugging), just a guess : the setCacheSize method on the 
CachedPulsarClient set a static value but as the guavaCache is also a static 
value of the same class, it's already set before the cacheSize could be changed.
   
   
   
   
   
   
   
   
   
   ### What did you expect to see?
   
   How can I change this parameter clientcachesize 
   
![image](https://github.com/apache/pulsar/assets/56428954/f7943f3f-5575-429e-be4a-dbbbe6eb9a8f)
   
![image](https://github.com/apache/pulsar/assets/56428954/4477d978-e173-4f10-a90a-b3ed3c405234)
   
![image](https://github.com/apache/pulsar/assets/56428954/953ace25-f5df-4731-9401-0e154c45f5ca)
   
   ### What did you see instead?
   
   ***Please help me***
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to