This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d7bac05 Add PulsarClient#isClosed method (#8428)
d7bac05 is described below
commit d7bac0571c70876d7939ee1d2efaeaa3a7581517
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Nov 6 01:42:00 2020 +0100
Add PulsarClient#isClosed method (#8428)
Currently there is no way to know if the Pulsar client has been already
closed or not, resulting in AlreadyClosedException errors.
---
.../org/apache/pulsar/client/api/PulsarClient.java | 9 ++++
.../pulsar/client/impl/PulsarClientImpl.java | 30 ++++++++++----
.../pulsar/client/impl/PulsarClientImplTest.java | 48 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 9 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index fdbf41e..fa2254c 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -282,4 +282,13 @@ public interface PulsarClient extends Closeable {
* if the forceful shutdown fails
*/
void shutdown() throws PulsarClientException;
+
+ /**
+ * Return internal state of the client. Useful if you want to check that
current client is valid.
+ * @return true is the client has been closed
+ * @see #shutdown()
+ * @see #close()
+ * @see #closeAsync()
+ */
+ boolean isClosed();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 64f1ef9..3c0b105 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -41,10 +41,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -295,7 +293,7 @@ public class PulsarClientImpl implements PulsarClient {
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic,
conf, producerCreatedFuture, -1, schema, interceptors);
}
-
+
producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}",
topic, ex.getMessage());
@@ -385,7 +383,7 @@ public class PulsarClientImpl implements PulsarClient {
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
-
+
consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic,
ex);
@@ -402,7 +400,7 @@ public class PulsarClientImpl implements PulsarClient {
ConsumerBase<T> consumer = new
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);
-
+
consumers.add(consumer);
return consumerSubscribedFuture;
@@ -436,7 +434,7 @@ public class PulsarClientImpl implements PulsarClient {
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
schema, subscriptionMode, interceptors);
-
+
consumers.add(consumer);
})
.exceptionally(ex -> {
@@ -508,7 +506,7 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of
executors
ExecutorService listenerThread =
externalExecutorProvider.getExecutor();
ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this,
conf, listenerThread, consumerSubscribedFuture, schema);
-
+
consumers.add(reader.getConsumer());
consumerSubscribedFuture.thenRun(() -> {
@@ -548,8 +546,16 @@ public class PulsarClientImpl implements PulsarClient {
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
+ PulsarClientException unwrapped = PulsarClientException.unwrap(e);
+ if (unwrapped instanceof
PulsarClientException.AlreadyClosedException) {
+ // this is not a problem
+ return;
+ }
+ throw unwrapped;
}
}
@@ -562,7 +568,7 @@ public class PulsarClientImpl implements PulsarClient {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
+
producers.forEach(p -> futures.add(p.closeAsync()));
consumers.forEach(c -> futures.add(c.closeAsync()));
@@ -603,6 +609,12 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
+ public boolean isClosed() {
+ State currentState = state.get();
+ return currentState == State.Closed || currentState == State.Closing;
+ }
+
+ @Override
public synchronized void updateServiceUrl(String serviceUrl) throws
PulsarClientException {
log.info("Updating service URL to {}", serviceUrl);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
new file mode 100644
index 0000000..2d114d0
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.ThreadFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+/**
+ * PulsarClientImpl unit tests.
+ */
+public class PulsarClientImplTest {
+
+ @Test
+ public void testIsClosed() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+ PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
+ assertFalse(clientImpl.isClosed());
+ clientImpl.close();
+ assertTrue(clientImpl.isClosed());
+ eventLoopGroup.shutdownGracefully().get();
+ }
+
+}