rdhabalia commented on a change in pull request #9485:
URL: https://github.com/apache/pulsar/pull/9485#discussion_r578144985
##########
File path:
pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
##########
@@ -494,7 +494,11 @@ public void stop() throws Exception {
LOG.debug("Local ZK/BK stopping ...");
for (BookieServer bookie : bs) {
- bookie.shutdown();
+ try {
+ bookie.shutdown();
+ } catch (Exception e) {
+ LOG.warn("failed to shutdown bookie", e);
Review comment:
it doesn't throw `InterruptedException` but in some cases it was
throwing other runtime exception in test, while closing the resources. So, it
helps test to clean other resources by handling this failure.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -247,29 +241,31 @@ public void validatePoliciesReadOnlyAccess() {
private CompletableFuture<Void> tryCreatePartitionAsync(final int
partition, CompletableFuture<Void> reuseFuture) {
CompletableFuture<Void> result = reuseFuture == null ? new
CompletableFuture<>() : reuseFuture;
- zkCreateOptimisticAsync(localZk(),
-
ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
- (rc, s, o, s1) -> {
- if (KeeperException.Code.OK.intValue() == rc) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Topic partition {} created.",
clientAppId(),
- topicName.getPartition(partition));
- }
- result.complete(null);
- } else if (KeeperException.Code.NODEEXISTS.intValue() ==
rc) {
+ namespaceResources().getLocalStore()
+
.put(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new
byte[0], Optional.of(-1L))
+ .thenAccept(r -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic partition {} created.",
clientAppId(), topicName.getPartition(partition));
+ }
+ result.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof AlreadyExistsException) {
log.info("[{}] Topic partition {} is exists, doing
nothing.", clientAppId(),
topicName.getPartition(partition));
result.complete(null);
- } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
- log.warn("[{}] Fail to create topic partition {} with
concurrent modification, retry now.",
- clientAppId(), topicName.getPartition(partition));
- tryCreatePartitionAsync(partition, result);
- } else {
- log.error("[{}] Fail to create topic partition {}",
clientAppId(),
- topicName.getPartition(partition),
KeeperException.create(KeeperException.Code.get(rc)));
-
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
- }
- });
+ } else if (ex.getCause() instanceof BadVersionException) {
+ log.warn("[{}] Fail to create topic partition {} with
concurrent modification, retry now.",
Review comment:
yes, good catch. it's related to #9501 , I will fix log.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]