ThomasTaketurns commented on issue #19784:
URL: https://github.com/apache/pulsar/issues/19784#issuecomment-1466117036

   @lifepuzzlefun , thanks for getting back to me.
   
   Regarding pulsar client : 
   
        public void initPulsarClient() throws TakeTurnsTechnicalException {
                try {
                        log.info("Creating client to Pulsar with url: {}", 
this.pulsarConfig.getUrl());
                        this.client = PulsarClient.builder()
                                        .serviceUrl(this.pulsarConfig.getUrl())
                                        .enableTlsHostnameVerification(false)
                                        .allowTlsInsecureConnection(true)
                                        .enableTransaction(true)
                                        
.listenerThreads(this.pulsarConfig.getConsumerThreads())
                                        .build();
                } catch (final PulsarClientException e) {
                        throw new TakeTurnsTechnicalException(e.getMessage(), 
e);
                }
        }
   
   And for producers : 
   
        public void connectPulsarProducer() throws TakeTurnsTechnicalException {
                if (this.pulsarConfig.getProducer().booleanValue()) {
                        if (this.producer != null && 
this.producer.isConnected()) {
                                throw new TakeTurnsTechnicalException("A Pulsar 
producer is already connected");
                        }
                        try {
                                log.info("Creating producer to Pulsar topic: 
{}", this.getFullTopicReference());
                                this.producer = 
this.getClient().newProducer(SCHEMA).autoUpdatePartitions(true)
                                                
.topic(this.getFullTopicReference()).blockIfQueueFull(true).enableBatching(false)
                                                .sendTimeout(0, 
TimeUnit.MILLISECONDS).intercept(oTelProducerInterceptor).create();
                                log.info("Producer connected to topic : {}", 
this.getFullTopicReference());
                        } catch (final PulsarClientException e) {
                                throw new 
TakeTurnsTechnicalException(e.getMessage(), e);
                        }
                }
        }
   
   We do use schemas.
   
   private static final AvroSchema<PulsarEvent> SCHEMA = 
AvroSchema.of(PulsarEvent.class);
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to