This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9605aed  add timeout to internal rest api (#4762)
9605aed is described below

commit 9605aede8aa1e14783d480a2fc6a55fe8ceaa67a
Author: Yuto Furuta <[email protected]>
AuthorDate: Fri Jul 19 22:58:03 2019 +0900

    add timeout to internal rest api (#4762)
---
 .../admin/internal/NonPersistentTopicsImpl.java    | 30 ++++++++++++----
 .../pulsar/client/admin/internal/TopicsImpl.java   | 40 ++++++++++++++++------
 2 files changed, 53 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
index 8e81473..d9b23c8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java
@@ -23,6 +23,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
@@ -52,12 +54,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public void createPartitionedTopic(String topic, int numPartitions) throws 
PulsarAdminException {
         try {
-            createPartitionedTopicAsync(topic, numPartitions).get();
+            createPartitionedTopicAsync(topic, 
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -72,12 +76,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) 
throws PulsarAdminException {
         try {
-            return getPartitionedTopicMetadataAsync(topic).get();
+            return 
getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -105,12 +111,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public NonPersistentTopicStats getStats(String topic) throws 
PulsarAdminException {
         try {
-            return getStatsAsync(topic).get();
+            return getStatsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -138,12 +146,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public PersistentTopicInternalStats getInternalStats(String topic) throws 
PulsarAdminException {
         try {
-            return getInternalStatsAsync(topic).get();
+            return getInternalStatsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -171,12 +181,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public void unload(String topic) throws PulsarAdminException {
         try {
-            unloadAsync(topic).get();
+            unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -190,12 +202,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public List<String> getListInBundle(String namespace, String bundleRange) 
throws PulsarAdminException {
         try {
-            return getListInBundleAsync(namespace, bundleRange).get();
+            return getListInBundleAsync(namespace, 
bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -221,12 +235,14 @@ public class NonPersistentTopicsImpl extends BaseResource 
implements NonPersiste
     @Override
     public List<String> getList(String namespace) throws PulsarAdminException {
         try {
-            return getListAsync(namespace).get();
+            return getListAsync(namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 50f3c69..79b9212 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -131,12 +131,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public List<String> getListInBundle(String namespace, String bundleRange) 
throws PulsarAdminException {
         try {
-            return getListInBundleAsync(namespace, bundleRange).get();
+            return getListInBundleAsync(namespace, 
bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -197,24 +199,28 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void createPartitionedTopic(String topic, int numPartitions) throws 
PulsarAdminException {
         try {
-            createPartitionedTopicAsync(topic, numPartitions).get();
+            createPartitionedTopicAsync(topic, 
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
     public void createNonPartitionedTopic(String topic) throws 
PulsarAdminException {
        try {
-               createNonPartitionedTopicAsync(topic).get();
+            createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
     
@@ -236,12 +242,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void updatePartitionedTopic(String topic, int numPartitions) throws 
PulsarAdminException {
         try {
-            updatePartitionedTopicAsync(topic, numPartitions).get();
+            updatePartitionedTopicAsync(topic, 
numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -256,12 +264,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) 
throws PulsarAdminException {
         try {
-            return getPartitionedTopicMetadataAsync(topic).get();
+            return 
getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -294,12 +304,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void deletePartitionedTopic(String topic, boolean force) throws 
PulsarAdminException {
         try {
-            deletePartitionedTopicAsync(topic, force).get();
+            deletePartitionedTopicAsync(topic, force).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -319,12 +331,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void delete(String topic, boolean force) throws 
PulsarAdminException {
         try {
-            deleteAsync(topic, force).get();
+            deleteAsync(topic, force).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -339,12 +353,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void unload(String topic) throws PulsarAdminException {
         try {
-            unloadAsync(topic).get();
+            unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -358,12 +374,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public List<String> getSubscriptions(String topic) throws 
PulsarAdminException {
         try {
-            return getSubscriptionsAsync(topic).get();
+            return getSubscriptionsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
@@ -595,12 +613,14 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public void skipAllMessages(String topic, String subName) throws 
PulsarAdminException {
         try {
-            skipAllMessagesAsync(topic, subName).get();
+            skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 

Reply via email to