Richard-Yi commented on issue #13687:
URL: https://github.com/apache/pulsar/issues/13687#issuecomment-1008544170


   @RobertIndie Can you show some example code? 
   My code is as follows
   ```java
   public CompletableFuture<Topic> getTopic(String topicName, boolean 
createIfMissing) {
           CompletableFuture<Topic> topicCompletableFuture = new 
CompletableFuture<>();
           if (null == pulsarService) {
               log.error("PulsarService is not set.");
               topicCompletableFuture.completeExceptionally(new 
PulsarServerException("PulsarService is not set."));
               return topicCompletableFuture;
           }
           // setup ownership of service unit to this broker
           LookupOptions lookupOptions = 
LookupOptions.builder().authoritative(true).build();
           
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(TopicName.get(topicName),
 lookupOptions).
                   whenComplete((addr, th) -> {
                       log.info("Find getBrokerServiceUrl {}, return Topic: 
{}", addr, topicName);
                       if (th != null || addr == null || addr.get() == null) {
                           log.warn("Failed getBrokerServiceUrl {}, return null 
Topic. throwable: ", topicName, th);
                           topicCompletableFuture.complete(null);
                           return;
                       }
                       if (log.isDebugEnabled()) {
                           log.debug("getBrokerServiceUrl for {} in 
ExchangeTopicManager. brokerAddress: {}",
                                   topicName, 
addr.get().getLookupData().getBrokerUrl());
                       }
                       pulsarService.getBrokerService().getTopic(topicName, 
createIfMissing)
                               .whenComplete((topicOptional, throwable) -> {
                                   if (throwable != null) {
                                       log.error("Failed to getTopic {}. 
exception: {}", topicName, throwable);
                                       topicCompletableFuture.complete(null);
                                       return;
                                   }
                                   try {
                                       if (topicOptional.isPresent()) {
                                           Topic topic = topicOptional.get();
                                           AbstractTopic abstractTopic = 
(AbstractTopic) topic;
                                           
abstractTopic.setDeleteWhileInactive(false);
                                           
topicCompletableFuture.complete(topic);
                                       } else {
                                           log.error("Get empty topic for name 
{}", topicName);
                                           
topicCompletableFuture.complete(null);
                                       }
                                   } catch (Exception e) {
                                       log.error("Failed to get client in 
registerInPersistentTopic {}. "
                                               + "exception:", topicName, e);
                                       topicCompletableFuture.complete(null);
                                   }
                               });
                   });
           return topicCompletableFuture;
       }
   ```
   after getting the topic, 
   ```java
    getTopic(...).whenComplete((topic, throwable) -> {
                                           if (throwable != null) {
                                               log.error("get message topic 
error.", throwable);
                                               return;
                                           }
                                           TypedMessageBuilderImpl<byte[]>
                                                   builder = new 
TypedMessageBuilderImpl<>(null, Schema.BYTES);
                                           builder.value(new byte[0])
                                                   .property(MESSAGE_ID,
                                                           messageIdAsString == 
null ? "" : messageIdAsString)
                                                   .property(LEDGER_ID, 
String.valueOf(position.getLedgerId()))
                                                   .property(EXCHANGE_TOPIC, 
exchangeTopicName)
                                                   .property(ENTRY_ID, 
String.valueOf(position.getEntryId()));
                                           Message<byte[]> builderMessage = 
builder.getMessage();
                                           ByteBuf byteBuf = 
MessageConvertUtils.messageToByteBuf(builderMessage);
                                           try {
                                               topic.publishMessage(byteBuf, 
(e, ledgerId1, entryId1) -> {
                                                   if (e != null) {
                                                       log.error("message index 
completed error,ledgerId:{} entryId:{}",
                                                               ledgerId1, 
entryId1, e);
                                                   }
                                               });
                                           } finally {
                                               
ReferenceCountUtil.release(byteBuf);
                                           }
                                       });
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to