gaoran10 commented on a change in pull request #8563:
URL: https://github.com/apache/pulsar/pull/8563#discussion_r522951764
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
##########
@@ -75,34 +76,48 @@ public void start() throws
TransactionCoordinatorClientException {
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
return
pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
- .thenAccept(partitionMeta -> {
+ .thenCompose(partitionMeta -> {
+ List<CompletableFuture<Void>> connectFutureList = new
ArrayList<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction meta store assign partition is
{}.", partitionMeta.partitions);
}
if (partitionMeta.partitions > 0) {
handlers = new
TransactionMetaStoreHandler[partitionMeta.partitions];
for (int i = 0; i < partitionMeta.partitions; i++) {
- TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(i, pulsarClient,
-
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() +
TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+ CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
+ connectFutureList.add(connectFuture);
+ TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(
+ i, pulsarClient, getTCAssignTopicName(i),
connectFuture);
handlers[i] = handler;
handlerMap.put(i, handler);
}
} else {
handlers = new TransactionMetaStoreHandler[1];
+ CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
+ connectFutureList.add(connectFuture);
TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(0, pulsarClient,
-
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ getTCAssignTopicName(-1), connectFuture);
handlers[0] = handler;
handlerMap.put(0, handler);
}
STATE_UPDATER.set(TransactionCoordinatorClientImpl.this,
State.READY);
+ return FutureUtil.waitForAll(connectFutureList);
});
} else {
return FutureUtil.failedFuture(new
CoordinatorClientStateException("Can not start while current state is " +
state));
}
}
+ private String getTCAssignTopicName(int partition) {
+ if (partition > 0) {
Review comment:
Ok, I'll fix this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]