This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8297c32  Pulsar Admin: reduce code duplication - part 4 (#13086)
8297c32 is described below

commit 8297c32fb24fedc0904293760c45ccb423d8fe79
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Dec 2 14:22:54 2021 +0100

    Pulsar Admin: reduce code duplication - part 4 (#13086)
---
 .../pulsar/client/admin/internal/SinksImpl.java    | 191 ++------------------
 .../pulsar/client/admin/internal/SourcesImpl.java  | 193 ++-------------------
 .../pulsar/client/admin/internal/TenantsImpl.java  |  70 +-------
 .../client/admin/internal/TransactionsImpl.java    | 129 ++------------
 4 files changed, 51 insertions(+), 532 deletions(-)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 215c893..f27071e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -24,9 +24,6 @@ import com.google.gson.Gson;
 import java.io.File;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -66,16 +63,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
 
     @Override
     public List<String> listSinks(String tenant, String namespace) throws 
PulsarAdminException {
-        try {
-            return listSinksAsync(tenant, namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> listSinksAsync(tenant, namespace));
     }
 
     @Override
@@ -106,16 +94,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
 
     @Override
     public SinkConfig getSink(String tenant, String namespace, String 
sinkName) throws PulsarAdminException {
-        try {
-            return getSinkAsync(tenant, namespace, 
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSinkAsync(tenant, namespace, sinkName));
     }
 
     @Override
@@ -147,16 +126,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public SinkStatus getSinkStatus(
             String tenant, String namespace, String sinkName) throws 
PulsarAdminException {
-        try {
-            return getSinkStatusAsync(tenant, namespace, 
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName));
     }
 
     @Override
@@ -188,16 +158,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(
             String tenant, String namespace, String sinkName, int id) throws 
PulsarAdminException {
-        try {
-            return getSinkStatusAsync(tenant, namespace, sinkName, 
id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName, id));
     }
 
     @Override
@@ -231,16 +192,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void createSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException {
-        try {
-            createSinkAsync(sinkConfig, fileName).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> createSinkAsync(sinkConfig, fileName));
     }
 
     @Override
@@ -284,16 +236,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException {
-        try {
-            createSinkWithUrlAsync(sinkConfig, pkgUrl).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> createSinkWithUrlAsync(sinkConfig, pkgUrl));
     }
 
     @Override
@@ -314,16 +257,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void deleteSink(String cluster, String namespace, String function) 
throws PulsarAdminException {
-        try {
-            deleteSinkAsync(cluster, namespace, 
function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> deleteSinkAsync(cluster, namespace, function));
     }
 
     @Override
@@ -339,16 +273,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public void updateSink(SinkConfig sinkConfig, String fileName, 
UpdateOptions updateOptions)
             throws PulsarAdminException {
-        try {
-            updateSinkAsync(sinkConfig, fileName, 
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> updateSinkAsync(sinkConfig, fileName, updateOptions));
     }
 
     @Override
@@ -411,16 +336,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, 
UpdateOptions updateOptions)
             throws PulsarAdminException {
-        try {
-            updateSinkWithUrlAsync(sinkConfig, pkgUrl, 
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> updateSinkWithUrlAsync(sinkConfig, pkgUrl, updateOptions));
     }
 
     @Override
@@ -466,17 +382,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public void restartSink(String tenant, String namespace, String 
functionName, int instanceId)
             throws PulsarAdminException {
-        try {
-            restartSinkAsync(tenant, namespace, functionName, instanceId)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> restartSinkAsync(tenant, namespace, functionName, 
instanceId));
     }
 
     @Override
@@ -493,16 +399,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void restartSink(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
-        try {
-            restartSinkAsync(tenant, namespace, 
functionName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> restartSinkAsync(tenant, namespace, functionName));
     }
 
     @Override
@@ -518,16 +415,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public void stopSink(String tenant, String namespace, String sinkName, int 
instanceId)
             throws PulsarAdminException {
-        try {
-            stopSinkAsync(tenant, namespace, sinkName, 
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> stopSinkAsync(tenant, namespace, sinkName, instanceId));
     }
 
     @Override
@@ -543,16 +431,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void stopSink(String tenant, String namespace, String sinkName) 
throws PulsarAdminException {
-        try {
-            stopSinkAsync(tenant, namespace, sinkName).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> stopSinkAsync(tenant, namespace, sinkName));
     }
 
     @Override
@@ -568,16 +447,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
     @Override
     public void startSink(String tenant, String namespace, String sinkName, 
int instanceId)
             throws PulsarAdminException {
-        try {
-            startSinkAsync(tenant, namespace, sinkName, 
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> startSinkAsync(tenant, namespace, sinkName, instanceId));
     }
 
     @Override
@@ -593,16 +463,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void startSink(String tenant, String namespace, String sinkName) 
throws PulsarAdminException {
-        try {
-            startSinkAsync(tenant, namespace, 
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> startSinkAsync(tenant, namespace, sinkName));
     }
 
     @Override
@@ -617,16 +478,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public List<ConnectorDefinition> getBuiltInSinks() throws 
PulsarAdminException {
-        try {
-            return getBuiltInSinksAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getBuiltInSinksAsync());
     }
 
     @Override
@@ -655,16 +507,7 @@ public class SinksImpl extends ComponentResource 
implements Sinks, Sink {
 
     @Override
     public void reloadBuiltInSinks() throws PulsarAdminException {
-        try {
-            reloadBuiltInSinksAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> reloadBuiltInSinksAsync());
     }
 
     @Override
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index f52845a..4996e8b 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -24,9 +24,6 @@ import com.google.gson.Gson;
 import java.io.File;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -65,16 +62,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public List<String> listSources(String tenant, String namespace) throws 
PulsarAdminException {
-        try {
-            return listSourcesAsync(tenant, namespace).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> listSourcesAsync(tenant, namespace));
     }
 
     @Override
@@ -102,16 +90,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public SourceConfig getSource(String tenant, String namespace, String 
sourceName) throws PulsarAdminException {
-        try {
-            return getSourceAsync(tenant, namespace, 
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSourceAsync(tenant, namespace, sourceName));
     }
 
     @Override
@@ -140,16 +119,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public SourceStatus getSourceStatus(
             String tenant, String namespace, String sourceName) throws 
PulsarAdminException {
-        try {
-            return getSourceStatusAsync(tenant, namespace, 
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSourceStatusAsync(tenant, namespace, sourceName));
     }
 
     @Override
@@ -178,17 +148,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceStatus(
             String tenant, String namespace, String sourceName, int id) throws 
PulsarAdminException {
-        try {
-            return getSourceStatusAsync(tenant, namespace, sourceName, id)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSourceStatusAsync(tenant, namespace, sourceName, 
id));
     }
 
     @Override
@@ -219,16 +179,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void createSource(SourceConfig sourceConfig, String fileName) 
throws PulsarAdminException {
-        try {
-            createSourceAsync(sourceConfig, fileName).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> createSourceAsync(sourceConfig, fileName));
     }
 
     @Override
@@ -269,16 +220,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) 
throws PulsarAdminException {
-        try {
-            createSourceWithUrlAsync(sourceConfig, 
pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> createSourceWithUrlAsync(sourceConfig, pkgUrl));
     }
 
     @Override
@@ -295,16 +237,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void deleteSource(String cluster, String namespace, String 
function) throws PulsarAdminException {
-        try {
-            deleteSourceAsync(cluster, namespace, 
function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> deleteSourceAsync(cluster, namespace, function));
     }
 
     @Override
@@ -316,16 +249,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public void updateSource(SourceConfig sourceConfig, String fileName, 
UpdateOptions updateOptions)
             throws PulsarAdminException {
-        try {
-            updateSourceAsync(sourceConfig, fileName, 
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> updateSourceAsync(sourceConfig, fileName, updateOptions));
     }
 
     @Override
@@ -385,17 +309,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl, 
UpdateOptions updateOptions)
             throws PulsarAdminException {
-        try {
-            updateSourceWithUrlAsync(sourceConfig, pkgUrl, updateOptions)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> updateSourceWithUrlAsync(sourceConfig, pkgUrl, 
updateOptions));
     }
 
     @Override
@@ -438,17 +352,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public void restartSource(String tenant, String namespace, String 
functionName, int instanceId)
             throws PulsarAdminException {
-        try {
-            restartSourceAsync(tenant, namespace, functionName, instanceId)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> restartSourceAsync(tenant, namespace, functionName, 
instanceId));
     }
 
     @Override
@@ -461,16 +365,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void restartSource(String tenant, String namespace, String 
functionName) throws PulsarAdminException {
-        try {
-            restartSourceAsync(tenant, namespace, 
functionName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> restartSourceAsync(tenant, namespace, functionName));
     }
 
     @Override
@@ -482,16 +377,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public void stopSource(String tenant, String namespace, String sourceName, 
int instanceId)
             throws PulsarAdminException {
-        try {
-            stopSourceAsync(tenant, namespace, sourceName, 
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> stopSourceAsync(tenant, namespace, sourceName, instanceId));
     }
 
     @Override
@@ -503,16 +389,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void stopSource(String tenant, String namespace, String sourceName) 
throws PulsarAdminException {
-        try {
-            stopSourceAsync(tenant, namespace, 
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> stopSourceAsync(tenant, namespace, sourceName));
     }
 
     @Override
@@ -524,16 +401,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
     @Override
     public void startSource(String tenant, String namespace, String 
sourceName, int instanceId)
             throws PulsarAdminException {
-        try {
-            startSourceAsync(tenant, namespace, sourceName, 
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> startSourceAsync(tenant, namespace, sourceName, 
instanceId));
     }
 
     @Override
@@ -546,16 +414,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void startSource(String tenant, String namespace, String 
sourceName) throws PulsarAdminException {
-        try {
-            startSourceAsync(tenant, namespace, 
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> startSourceAsync(tenant, namespace, sourceName));
     }
 
     @Override
@@ -566,16 +425,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public List<ConnectorDefinition> getBuiltInSources() throws 
PulsarAdminException {
-        try {
-            return getBuiltInSourcesAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getBuiltInSourcesAsync());
     }
 
     @Override
@@ -604,16 +454,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
 
     @Override
     public void reloadBuiltInSources() throws PulsarAdminException {
-        try {
-            reloadBuiltInSourcesAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> reloadBuiltInSourcesAsync());
     }
 
     @Override
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 953fb98..74cecf6 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.client.admin.internal;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -45,16 +42,7 @@ public class TenantsImpl extends BaseResource implements 
Tenants, Properties {
 
     @Override
     public List<String> getTenants() throws PulsarAdminException {
-        try {
-            return getTenantsAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTenantsAsync());
     }
 
     @Override
@@ -77,16 +65,7 @@ public class TenantsImpl extends BaseResource implements 
Tenants, Properties {
 
     @Override
     public TenantInfo getTenantInfo(String tenant) throws PulsarAdminException 
{
-        try {
-            return getTenantInfoAsync(tenant).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTenantInfoAsync(tenant));
     }
 
     @Override
@@ -110,17 +89,7 @@ public class TenantsImpl extends BaseResource implements 
Tenants, Properties {
 
     @Override
     public void createTenant(String tenant, TenantInfo config) throws 
PulsarAdminException {
-        try {
-            createTenantAsync(tenant, config)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> createTenantAsync(tenant, config));
     }
 
     @Override
@@ -131,16 +100,7 @@ public class TenantsImpl extends BaseResource implements 
Tenants, Properties {
 
     @Override
     public void updateTenant(String tenant, TenantInfo config) throws 
PulsarAdminException {
-        try {
-            updateTenantAsync(tenant, config).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> updateTenantAsync(tenant, config));
     }
 
     @Override
@@ -151,30 +111,12 @@ public class TenantsImpl extends BaseResource implements 
Tenants, Properties {
 
     @Override
     public void deleteTenant(String tenant) throws PulsarAdminException {
-        try {
-            deleteTenantAsync(tenant).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> deleteTenantAsync(tenant));
     }
 
     @Override
     public void deleteTenant(String tenant, boolean force) throws 
PulsarAdminException {
-        try {
-            deleteTenantAsync(tenant, force).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> deleteTenantAsync(tenant, force));
     }
 
     @Override
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index bcee3bc..46262d2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -20,9 +20,7 @@ package org.apache.pulsar.client.admin.internal;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -69,17 +67,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public TransactionCoordinatorStats getCoordinatorStatsById(int 
coordinatorId) throws PulsarAdminException {
-        try {
-            return getCoordinatorStatsByIdAsync(coordinatorId)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getCoordinatorStatsByIdAsync(coordinatorId));
     }
 
     @Override
@@ -103,16 +91,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public Map<Integer, TransactionCoordinatorStats> getCoordinatorStats() 
throws PulsarAdminException {
-        try {
-            return getCoordinatorStatsAsync().get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getCoordinatorStatsAsync());
     }
 
     @Override
@@ -140,16 +119,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID, 
String topic) throws PulsarAdminException {
-        try {
-            return getTransactionInBufferStatsAsync(txnID, 
topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTransactionInBufferStatsAsync(txnID, topic));
     }
 
     @Override
@@ -180,17 +150,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
     @Override
     public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID 
txnID, String topic,
                                                                         String 
subName) throws PulsarAdminException {
-        try {
-            return getTransactionInPendingAckStatsAsync(txnID, topic, subName)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTransactionInPendingAckStatsAsync(txnID, topic, 
subName));
     }
 
     @Override
@@ -216,17 +176,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public TransactionMetadata getTransactionMetadata(TxnID txnID) throws 
PulsarAdminException {
-        try {
-            return getTransactionMetadataAsync(txnID)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTransactionMetadataAsync(txnID));
     }
 
     @Override
@@ -251,16 +201,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public TransactionBufferStats getTransactionBufferStats(String topic) 
throws PulsarAdminException {
-        try {
-            return 
getTransactionBufferStatsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getTransactionBufferStatsAsync(topic));
     }
 
     @Override
@@ -286,16 +227,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
 
     @Override
     public TransactionPendingAckStats getPendingAckStats(String topic, String 
subName) throws PulsarAdminException {
-        try {
-            return getPendingAckStatsAsync(topic, 
subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getPendingAckStatsAsync(topic, subName));
     }
 
     @Override
@@ -327,17 +259,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
                                                                                
long timeout,
                                                                                
TimeUnit timeUnit)
             throws PulsarAdminException {
-        try {
-            return getSlowTransactionsByCoordinatorIdAsync(coordinatorId, 
timeout, timeUnit)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> 
getSlowTransactionsByCoordinatorIdAsync(coordinatorId, timeout, timeUnit));
     }
 
     @Override
@@ -349,16 +271,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
     @Override
     public Map<String, TransactionMetadata> getSlowTransactions(long timeout,
                                                                 TimeUnit 
timeUnit) throws PulsarAdminException {
-        try {
-            return getSlowTransactionsAsync(timeout, 
timeUnit).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getSlowTransactionsAsync(timeout, timeUnit));
     }
 
     @Override
@@ -387,17 +300,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
     public TransactionCoordinatorInternalStats getCoordinatorInternalStats(int 
coordinatorId,
                                                                            
boolean metadata)
             throws PulsarAdminException {
-        try {
-            return getCoordinatorInternalStatsAsync(coordinatorId, metadata)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getCoordinatorInternalStatsAsync(coordinatorId, 
metadata));
     }
 
     @Override
@@ -429,17 +332,7 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
     public TransactionPendingAckInternalStats 
getPendingAckInternalStats(String topic,
                                                                          
String subName,
                                                                          
boolean metadata) throws PulsarAdminException {
-        try {
-            return getPendingAckInternalStatsAsync(topic, subName, metadata)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        return sync(() -> getPendingAckInternalStatsAsync(topic, subName, 
metadata));
     }
 
 }

Reply via email to