eolivelli commented on a change in pull request #9308:
URL: https://github.com/apache/pulsar/pull/9308#discussion_r601221360
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -640,8 +643,29 @@ public synchronized void
setupBrokerPublishRateLimiterMonitor() {
}
}
- @Override
public void close() throws IOException {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ throw new PulsarServerException(e);
Review comment:
I would write here:
`Thread.currentThread().interrupt();`
and do not throw an exception, this way we are existing the procedure as
expected and the caller is able to deal with the Interrupted status of the
Thread
We can get rid of the "throws PulsarServerException" clause as well
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -296,16 +299,37 @@ public MetadataStoreExtended
createConfigurationMetadataStore() throws MetadataS
.build());
}
+ @Override
+ public void close() throws PulsarServerException {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof PulsarServerException) {
+ throw (PulsarServerException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ throw new PulsarServerException(e);
Review comment:
I would write here:
`Thread.currentThread().interrupt();`
and do not throw an exception, this way we are existing the procedure as
expected and the caller is able to deal with the Interrupted status of the
Thread
We can get rid of the "throws PulsarServerException" clause as well
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -640,8 +643,29 @@ public synchronized void
setupBrokerPublishRateLimiterMonitor() {
}
}
- @Override
public void close() throws IOException {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ throw new PulsarServerException(e);
+ }
+ }
+
+ public CompletableFuture<Void> closeAsync() {
+ try {
+ return doCloseAsync();
+ } catch (IOException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doCloseAsync() throws IOException {
Review comment:
we should not "throw IOException" but return a failed future
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -429,6 +454,10 @@ public void close() throws PulsarServerException {
state = State.Closed;
isClosedCondition.signalAll();
+ CompletableFuture<Void> shutdownFuture =
+ CompletableFuture.allOf(asyncCloseFutures.toArray(new
CompletableFuture[0]));
+ closeFutureReference.set(shutdownFuture);
+ return shutdownFuture;
} catch (Exception e) {
if (e instanceof CompletionException && e.getCause() instanceof
MetadataStoreException) {
throw new
PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
Review comment:
what about doing this way ?
`return FutureUtil.failedFuture( new
PulsarServerException(MetadataStoreException.unwrap((CompletionException) e)));`
the caller expects to receive a failed Future
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]