GenerousMan commented on code in PR #6568:
URL: https://github.com/apache/rocketmq/pull/6568#discussion_r1163538597
##########
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java:
##########
@@ -153,6 +157,38 @@ public DefaultMQProducerImpl(final DefaultMQProducer
defaultMQProducer, RPCHook
semaphoreAsyncSendSize = new Semaphore(1024 * 1024, true);
log.info("semaphoreAsyncSendSize can not be smaller than 1M.");
}
+
+ ServiceDetector serviceDetector = new ServiceDetector() {
+ @Override
+ public boolean detect(String endpoint, long timeoutMillis) {
+ Optional<String> candidateTopic = pickTopic();
+ if (!candidateTopic.isPresent()) {
+ return false;
+ }
+ try {
+ MessageQueue mq = new MessageQueue(candidateTopic.get(),
null, 0);
+ mQClientFactory.getMQClientAPIImpl()
+ .getMaxOffset(endpoint, mq, timeoutMillis);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ };
Review Comment:
Very elegant solution. I will modify as you said.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]