This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9605aed add timeout to internal rest api (#4762)
9605aed is described below
commit 9605aede8aa1e14783d480a2fc6a55fe8ceaa67a
Author: Yuto Furuta <[email protected]>
AuthorDate: Fri Jul 19 22:58:03 2019 +0900
add timeout to internal rest api (#4762)
---
.../admin/internal/NonPersistentTopicsImpl.java | 30 ++++++++++++----
.../pulsar/client/admin/internal/TopicsImpl.java | 40 ++++++++++++++++------
2 files changed, 53 insertions(+), 17 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 8e81473..d9b23c8 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -23,6 +23,8 @@ import static
com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
@@ -52,12 +54,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public void createPartitionedTopic(String topic, int numPartitions) throws
PulsarAdminException {
try {
- createPartitionedTopicAsync(topic, numPartitions).get();
+ createPartitionedTopicAsync(topic,
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -72,12 +76,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic)
throws PulsarAdminException {
try {
- return getPartitionedTopicMetadataAsync(topic).get();
+ return
getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -105,12 +111,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public NonPersistentTopicStats getStats(String topic) throws
PulsarAdminException {
try {
- return getStatsAsync(topic).get();
+ return getStatsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -138,12 +146,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public PersistentTopicInternalStats getInternalStats(String topic) throws
PulsarAdminException {
try {
- return getInternalStatsAsync(topic).get();
+ return getInternalStatsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -171,12 +181,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public void unload(String topic) throws PulsarAdminException {
try {
- unloadAsync(topic).get();
+ unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -190,12 +202,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public List<String> getListInBundle(String namespace, String bundleRange)
throws PulsarAdminException {
try {
- return getListInBundleAsync(namespace, bundleRange).get();
+ return getListInBundleAsync(namespace,
bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -221,12 +235,14 @@ public class NonPersistentTopicsImpl extends BaseResource
implements NonPersiste
@Override
public List<String> getList(String namespace) throws PulsarAdminException {
try {
- return getListAsync(namespace).get();
+ return getListAsync(namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 50f3c69..79b9212 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -131,12 +131,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public List<String> getListInBundle(String namespace, String bundleRange)
throws PulsarAdminException {
try {
- return getListInBundleAsync(namespace, bundleRange).get();
+ return getListInBundleAsync(namespace,
bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -197,24 +199,28 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void createPartitionedTopic(String topic, int numPartitions) throws
PulsarAdminException {
try {
- createPartitionedTopicAsync(topic, numPartitions).get();
+ createPartitionedTopicAsync(topic,
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void createNonPartitionedTopic(String topic) throws
PulsarAdminException {
try {
- createNonPartitionedTopicAsync(topic).get();
+ createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -236,12 +242,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void updatePartitionedTopic(String topic, int numPartitions) throws
PulsarAdminException {
try {
- updatePartitionedTopicAsync(topic, numPartitions).get();
+ updatePartitionedTopicAsync(topic,
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -256,12 +264,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic)
throws PulsarAdminException {
try {
- return getPartitionedTopicMetadataAsync(topic).get();
+ return
getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -294,12 +304,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void deletePartitionedTopic(String topic, boolean force) throws
PulsarAdminException {
try {
- deletePartitionedTopicAsync(topic, force).get();
+ deletePartitionedTopicAsync(topic, force).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -319,12 +331,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void delete(String topic, boolean force) throws
PulsarAdminException {
try {
- deleteAsync(topic, force).get();
+ deleteAsync(topic, force).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -339,12 +353,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void unload(String topic) throws PulsarAdminException {
try {
- unloadAsync(topic).get();
+ unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -358,12 +374,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public List<String> getSubscriptions(String topic) throws
PulsarAdminException {
try {
- return getSubscriptionsAsync(topic).get();
+ return getSubscriptionsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@@ -595,12 +613,14 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public void skipAllMessages(String topic, String subName) throws
PulsarAdminException {
try {
- skipAllMessagesAsync(topic, subName).get();
+ skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}