Richard-Yi commented on issue #13687:
URL: https://github.com/apache/pulsar/issues/13687#issuecomment-1008544170
@RobertIndie Can you show some example code?
My code is as follows
```java
public CompletableFuture<Topic> getTopic(String topicName, boolean
createIfMissing) {
CompletableFuture<Topic> topicCompletableFuture = new
CompletableFuture<>();
if (null == pulsarService) {
log.error("PulsarService is not set.");
topicCompletableFuture.completeExceptionally(new
PulsarServerException("PulsarService is not set."));
return topicCompletableFuture;
}
// setup ownership of service unit to this broker
LookupOptions lookupOptions =
LookupOptions.builder().authoritative(true).build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(TopicName.get(topicName),
lookupOptions).
whenComplete((addr, th) -> {
log.info("Find getBrokerServiceUrl {}, return Topic:
{}", addr, topicName);
if (th != null || addr == null || addr.get() == null) {
log.warn("Failed getBrokerServiceUrl {}, return null
Topic. throwable: ", topicName, th);
topicCompletableFuture.complete(null);
return;
}
if (log.isDebugEnabled()) {
log.debug("getBrokerServiceUrl for {} in
ExchangeTopicManager. brokerAddress: {}",
topicName,
addr.get().getLookupData().getBrokerUrl());
}
pulsarService.getBrokerService().getTopic(topicName,
createIfMissing)
.whenComplete((topicOptional, throwable) -> {
if (throwable != null) {
log.error("Failed to getTopic {}.
exception: {}", topicName, throwable);
topicCompletableFuture.complete(null);
return;
}
try {
if (topicOptional.isPresent()) {
Topic topic = topicOptional.get();
AbstractTopic abstractTopic =
(AbstractTopic) topic;
abstractTopic.setDeleteWhileInactive(false);
topicCompletableFuture.complete(topic);
} else {
log.error("Get empty topic for name
{}", topicName);
topicCompletableFuture.complete(null);
}
} catch (Exception e) {
log.error("Failed to get client in
registerInPersistentTopic {}. "
+ "exception:", topicName, e);
topicCompletableFuture.complete(null);
}
});
});
return topicCompletableFuture;
}
```
after getting the topic,
```java
getTopic(...).whenComplete((topic, throwable) -> {
if (throwable != null) {
log.error("get message topic
error.", throwable);
return;
}
TypedMessageBuilderImpl<byte[]>
builder = new
TypedMessageBuilderImpl<>(null, Schema.BYTES);
builder.value(new byte[0])
.property(MESSAGE_ID,
messageIdAsString ==
null ? "" : messageIdAsString)
.property(LEDGER_ID,
String.valueOf(position.getLedgerId()))
.property(EXCHANGE_TOPIC,
exchangeTopicName)
.property(ENTRY_ID,
String.valueOf(position.getEntryId()));
Message<byte[]> builderMessage =
builder.getMessage();
ByteBuf byteBuf =
MessageConvertUtils.messageToByteBuf(builderMessage);
try {
topic.publishMessage(byteBuf,
(e, ledgerId1, entryId1) -> {
if (e != null) {
log.error("message index
completed error,ledgerId:{} entryId:{}",
ledgerId1,
entryId1, e);
}
});
} finally {
ReferenceCountUtil.release(byteBuf);
}
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]