vivekkrj commented on issue #4988: Producer create method blocked forever 
URL: https://github.com/apache/pulsar/issues/4988#issuecomment-528108316
 
 
   Here is the sample code base for the client and pruducer block:
   
   /**
         * 
         * @throws PulsarClientException
         */
        private PulsarClient getProducerClient() throws PulsarClientException {
                PulsarClient client = null;
                if(producerOptionBean.isAuthenticationEnable()) {
                        Authentication authentication = AuthenticationFactory
                                        
.TLS(producerOptionBean.getClientCertificateFile(), 
producerOptionBean.getClientCertificateKey());
                        client = PulsarClient
                                        .builder()
                                        
.serviceUrl(producerOptionBean.getPulsarBrokerUrl())
                                        
.tlsTrustCertsFilePath(producerOptionBean.getTrustCa())
                                        
.allowTlsInsecureConnection(producerOptionBean.isAllowTlsInsecureConnection())
                                        .enableTlsHostnameVerification(false)
                                        .authentication(authentication)
                                        .build();
                }
                else {
                        client = PulsarClient
                                        .builder()
                                        
.serviceUrl(producerOptionBean.getPulsarBrokerUrl())
                                        
.tlsTrustCertsFilePath(producerOptionBean.getTrustCa())
                                        
.allowTlsInsecureConnection(producerOptionBean.isAllowTlsInsecureConnection())
                                        .build();
                }
                return client;
        }
   
   /**
         * 
         * @throws PulsarClientException
         */
        private Producer<byte[]> buildProducer(PulsarClient client, String 
producerName) throws PulsarClientException {
                Producer<byte[]> producer = client.newProducer()
                                .topic(producerOptionBean.getPulsarTopic())
                                .producerName(producerName)
                                
.enableBatching(producerOptionBean.isEnableBatching())
                                
.batchingMaxPublishDelay(producerOptionBean.getBatchMessageMaxDelay(), 
TimeUnit.MILLISECONDS)
                                
.sendTimeout(producerOptionBean.getMsgAckTimeout(), TimeUnit.SECONDS)
                                .compressionType(getCompressionType())
                                
.maxPendingMessages(producerOptionBean.getMaxPendingMessages())
                                
.batchingMaxMessages(producerOptionBean.getBatchingMaxMessages())
                                
.blockIfQueueFull(producerOptionBean.isBlockIfQueueFull())
                                .create();
                return producer;
        }
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to