This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8297c32 Pulsar Admin: reduce code duplication - part 4 (#13086)
8297c32 is described below
commit 8297c32fb24fedc0904293760c45ccb423d8fe79
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Dec 2 14:22:54 2021 +0100
Pulsar Admin: reduce code duplication - part 4 (#13086)
---
.../pulsar/client/admin/internal/SinksImpl.java | 191 ++------------------
.../pulsar/client/admin/internal/SourcesImpl.java | 193 ++-------------------
.../pulsar/client/admin/internal/TenantsImpl.java | 70 +-------
.../client/admin/internal/TransactionsImpl.java | 129 ++------------
4 files changed, 51 insertions(+), 532 deletions(-)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 215c893..f27071e 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -24,9 +24,6 @@ import com.google.gson.Gson;
import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
@@ -66,16 +63,7 @@ public class SinksImpl extends ComponentResource implements
Sinks, Sink {
@Override
public List<String> listSinks(String tenant, String namespace) throws
PulsarAdminException {
- try {
- return listSinksAsync(tenant, namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> listSinksAsync(tenant, namespace));
}
@Override
@@ -106,16 +94,7 @@ public class SinksImpl extends ComponentResource implements
Sinks, Sink {
@Override
public SinkConfig getSink(String tenant, String namespace, String
sinkName) throws PulsarAdminException {
- try {
- return getSinkAsync(tenant, namespace,
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSinkAsync(tenant, namespace, sinkName));
}
@Override
@@ -147,16 +126,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public SinkStatus getSinkStatus(
String tenant, String namespace, String sinkName) throws
PulsarAdminException {
- try {
- return getSinkStatusAsync(tenant, namespace,
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName));
}
@Override
@@ -188,16 +158,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(
String tenant, String namespace, String sinkName, int id) throws
PulsarAdminException {
- try {
- return getSinkStatusAsync(tenant, namespace, sinkName,
id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSinkStatusAsync(tenant, namespace, sinkName, id));
}
@Override
@@ -231,16 +192,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void createSink(SinkConfig sinkConfig, String fileName) throws
PulsarAdminException {
- try {
- createSinkAsync(sinkConfig, fileName).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> createSinkAsync(sinkConfig, fileName));
}
@Override
@@ -284,16 +236,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws
PulsarAdminException {
- try {
- createSinkWithUrlAsync(sinkConfig, pkgUrl).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> createSinkWithUrlAsync(sinkConfig, pkgUrl));
}
@Override
@@ -314,16 +257,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void deleteSink(String cluster, String namespace, String function)
throws PulsarAdminException {
- try {
- deleteSinkAsync(cluster, namespace,
function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> deleteSinkAsync(cluster, namespace, function));
}
@Override
@@ -339,16 +273,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void updateSink(SinkConfig sinkConfig, String fileName,
UpdateOptions updateOptions)
throws PulsarAdminException {
- try {
- updateSinkAsync(sinkConfig, fileName,
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> updateSinkAsync(sinkConfig, fileName, updateOptions));
}
@Override
@@ -411,16 +336,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl,
UpdateOptions updateOptions)
throws PulsarAdminException {
- try {
- updateSinkWithUrlAsync(sinkConfig, pkgUrl,
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> updateSinkWithUrlAsync(sinkConfig, pkgUrl, updateOptions));
}
@Override
@@ -466,17 +382,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void restartSink(String tenant, String namespace, String
functionName, int instanceId)
throws PulsarAdminException {
- try {
- restartSinkAsync(tenant, namespace, functionName, instanceId)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> restartSinkAsync(tenant, namespace, functionName,
instanceId));
}
@Override
@@ -493,16 +399,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void restartSink(String tenant, String namespace, String
functionName) throws PulsarAdminException {
- try {
- restartSinkAsync(tenant, namespace,
functionName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> restartSinkAsync(tenant, namespace, functionName));
}
@Override
@@ -518,16 +415,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void stopSink(String tenant, String namespace, String sinkName, int
instanceId)
throws PulsarAdminException {
- try {
- stopSinkAsync(tenant, namespace, sinkName,
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> stopSinkAsync(tenant, namespace, sinkName, instanceId));
}
@Override
@@ -543,16 +431,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void stopSink(String tenant, String namespace, String sinkName)
throws PulsarAdminException {
- try {
- stopSinkAsync(tenant, namespace, sinkName).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> stopSinkAsync(tenant, namespace, sinkName));
}
@Override
@@ -568,16 +447,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void startSink(String tenant, String namespace, String sinkName,
int instanceId)
throws PulsarAdminException {
- try {
- startSinkAsync(tenant, namespace, sinkName,
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> startSinkAsync(tenant, namespace, sinkName, instanceId));
}
@Override
@@ -593,16 +463,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void startSink(String tenant, String namespace, String sinkName)
throws PulsarAdminException {
- try {
- startSinkAsync(tenant, namespace,
sinkName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> startSinkAsync(tenant, namespace, sinkName));
}
@Override
@@ -617,16 +478,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public List<ConnectorDefinition> getBuiltInSinks() throws
PulsarAdminException {
- try {
- return getBuiltInSinksAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getBuiltInSinksAsync());
}
@Override
@@ -655,16 +507,7 @@ public class SinksImpl extends ComponentResource
implements Sinks, Sink {
@Override
public void reloadBuiltInSinks() throws PulsarAdminException {
- try {
- reloadBuiltInSinksAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> reloadBuiltInSinksAsync());
}
@Override
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index f52845a..4996e8b 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -24,9 +24,6 @@ import com.google.gson.Gson;
import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
@@ -65,16 +62,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public List<String> listSources(String tenant, String namespace) throws
PulsarAdminException {
- try {
- return listSourcesAsync(tenant, namespace).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> listSourcesAsync(tenant, namespace));
}
@Override
@@ -102,16 +90,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public SourceConfig getSource(String tenant, String namespace, String
sourceName) throws PulsarAdminException {
- try {
- return getSourceAsync(tenant, namespace,
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSourceAsync(tenant, namespace, sourceName));
}
@Override
@@ -140,16 +119,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public SourceStatus getSourceStatus(
String tenant, String namespace, String sourceName) throws
PulsarAdminException {
- try {
- return getSourceStatusAsync(tenant, namespace,
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSourceStatusAsync(tenant, namespace, sourceName));
}
@Override
@@ -178,17 +148,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData
getSourceStatus(
String tenant, String namespace, String sourceName, int id) throws
PulsarAdminException {
- try {
- return getSourceStatusAsync(tenant, namespace, sourceName, id)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSourceStatusAsync(tenant, namespace, sourceName,
id));
}
@Override
@@ -219,16 +179,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void createSource(SourceConfig sourceConfig, String fileName)
throws PulsarAdminException {
- try {
- createSourceAsync(sourceConfig, fileName).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> createSourceAsync(sourceConfig, fileName));
}
@Override
@@ -269,16 +220,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl)
throws PulsarAdminException {
- try {
- createSourceWithUrlAsync(sourceConfig,
pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> createSourceWithUrlAsync(sourceConfig, pkgUrl));
}
@Override
@@ -295,16 +237,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void deleteSource(String cluster, String namespace, String
function) throws PulsarAdminException {
- try {
- deleteSourceAsync(cluster, namespace,
function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> deleteSourceAsync(cluster, namespace, function));
}
@Override
@@ -316,16 +249,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void updateSource(SourceConfig sourceConfig, String fileName,
UpdateOptions updateOptions)
throws PulsarAdminException {
- try {
- updateSourceAsync(sourceConfig, fileName,
updateOptions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> updateSourceAsync(sourceConfig, fileName, updateOptions));
}
@Override
@@ -385,17 +309,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl,
UpdateOptions updateOptions)
throws PulsarAdminException {
- try {
- updateSourceWithUrlAsync(sourceConfig, pkgUrl, updateOptions)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> updateSourceWithUrlAsync(sourceConfig, pkgUrl,
updateOptions));
}
@Override
@@ -438,17 +352,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void restartSource(String tenant, String namespace, String
functionName, int instanceId)
throws PulsarAdminException {
- try {
- restartSourceAsync(tenant, namespace, functionName, instanceId)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> restartSourceAsync(tenant, namespace, functionName,
instanceId));
}
@Override
@@ -461,16 +365,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void restartSource(String tenant, String namespace, String
functionName) throws PulsarAdminException {
- try {
- restartSourceAsync(tenant, namespace,
functionName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> restartSourceAsync(tenant, namespace, functionName));
}
@Override
@@ -482,16 +377,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void stopSource(String tenant, String namespace, String sourceName,
int instanceId)
throws PulsarAdminException {
- try {
- stopSourceAsync(tenant, namespace, sourceName,
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> stopSourceAsync(tenant, namespace, sourceName, instanceId));
}
@Override
@@ -503,16 +389,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void stopSource(String tenant, String namespace, String sourceName)
throws PulsarAdminException {
- try {
- stopSourceAsync(tenant, namespace,
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> stopSourceAsync(tenant, namespace, sourceName));
}
@Override
@@ -524,16 +401,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void startSource(String tenant, String namespace, String
sourceName, int instanceId)
throws PulsarAdminException {
- try {
- startSourceAsync(tenant, namespace, sourceName,
instanceId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> startSourceAsync(tenant, namespace, sourceName,
instanceId));
}
@Override
@@ -546,16 +414,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void startSource(String tenant, String namespace, String
sourceName) throws PulsarAdminException {
- try {
- startSourceAsync(tenant, namespace,
sourceName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> startSourceAsync(tenant, namespace, sourceName));
}
@Override
@@ -566,16 +425,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public List<ConnectorDefinition> getBuiltInSources() throws
PulsarAdminException {
- try {
- return getBuiltInSourcesAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getBuiltInSourcesAsync());
}
@Override
@@ -604,16 +454,7 @@ public class SourcesImpl extends ComponentResource
implements Sources, Source {
@Override
public void reloadBuiltInSources() throws PulsarAdminException {
- try {
- reloadBuiltInSourcesAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> reloadBuiltInSourcesAsync());
}
@Override
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
index 953fb98..74cecf6 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.client.admin.internal;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
@@ -45,16 +42,7 @@ public class TenantsImpl extends BaseResource implements
Tenants, Properties {
@Override
public List<String> getTenants() throws PulsarAdminException {
- try {
- return getTenantsAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTenantsAsync());
}
@Override
@@ -77,16 +65,7 @@ public class TenantsImpl extends BaseResource implements
Tenants, Properties {
@Override
public TenantInfo getTenantInfo(String tenant) throws PulsarAdminException
{
- try {
- return getTenantInfoAsync(tenant).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTenantInfoAsync(tenant));
}
@Override
@@ -110,17 +89,7 @@ public class TenantsImpl extends BaseResource implements
Tenants, Properties {
@Override
public void createTenant(String tenant, TenantInfo config) throws
PulsarAdminException {
- try {
- createTenantAsync(tenant, config)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> createTenantAsync(tenant, config));
}
@Override
@@ -131,16 +100,7 @@ public class TenantsImpl extends BaseResource implements
Tenants, Properties {
@Override
public void updateTenant(String tenant, TenantInfo config) throws
PulsarAdminException {
- try {
- updateTenantAsync(tenant, config).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> updateTenantAsync(tenant, config));
}
@Override
@@ -151,30 +111,12 @@ public class TenantsImpl extends BaseResource implements
Tenants, Properties {
@Override
public void deleteTenant(String tenant) throws PulsarAdminException {
- try {
- deleteTenantAsync(tenant).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> deleteTenantAsync(tenant));
}
@Override
public void deleteTenant(String tenant, boolean force) throws
PulsarAdminException {
- try {
- deleteTenantAsync(tenant, force).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ sync(() -> deleteTenantAsync(tenant, force));
}
@Override
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index bcee3bc..46262d2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -20,9 +20,7 @@ package org.apache.pulsar.client.admin.internal;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -69,17 +67,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionCoordinatorStats getCoordinatorStatsById(int
coordinatorId) throws PulsarAdminException {
- try {
- return getCoordinatorStatsByIdAsync(coordinatorId)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getCoordinatorStatsByIdAsync(coordinatorId));
}
@Override
@@ -103,16 +91,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public Map<Integer, TransactionCoordinatorStats> getCoordinatorStats()
throws PulsarAdminException {
- try {
- return getCoordinatorStatsAsync().get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getCoordinatorStatsAsync());
}
@Override
@@ -140,16 +119,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID,
String topic) throws PulsarAdminException {
- try {
- return getTransactionInBufferStatsAsync(txnID,
topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTransactionInBufferStatsAsync(txnID, topic));
}
@Override
@@ -180,17 +150,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID
txnID, String topic,
String
subName) throws PulsarAdminException {
- try {
- return getTransactionInPendingAckStatsAsync(txnID, topic, subName)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTransactionInPendingAckStatsAsync(txnID, topic,
subName));
}
@Override
@@ -216,17 +176,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionMetadata getTransactionMetadata(TxnID txnID) throws
PulsarAdminException {
- try {
- return getTransactionMetadataAsync(txnID)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTransactionMetadataAsync(txnID));
}
@Override
@@ -251,16 +201,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionBufferStats getTransactionBufferStats(String topic)
throws PulsarAdminException {
- try {
- return
getTransactionBufferStatsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getTransactionBufferStatsAsync(topic));
}
@Override
@@ -286,16 +227,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public TransactionPendingAckStats getPendingAckStats(String topic, String
subName) throws PulsarAdminException {
- try {
- return getPendingAckStatsAsync(topic,
subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getPendingAckStatsAsync(topic, subName));
}
@Override
@@ -327,17 +259,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
long timeout,
TimeUnit timeUnit)
throws PulsarAdminException {
- try {
- return getSlowTransactionsByCoordinatorIdAsync(coordinatorId,
timeout, timeUnit)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() ->
getSlowTransactionsByCoordinatorIdAsync(coordinatorId, timeout, timeUnit));
}
@Override
@@ -349,16 +271,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public Map<String, TransactionMetadata> getSlowTransactions(long timeout,
TimeUnit
timeUnit) throws PulsarAdminException {
- try {
- return getSlowTransactionsAsync(timeout,
timeUnit).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getSlowTransactionsAsync(timeout, timeUnit));
}
@Override
@@ -387,17 +300,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
public TransactionCoordinatorInternalStats getCoordinatorInternalStats(int
coordinatorId,
boolean metadata)
throws PulsarAdminException {
- try {
- return getCoordinatorInternalStatsAsync(coordinatorId, metadata)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getCoordinatorInternalStatsAsync(coordinatorId,
metadata));
}
@Override
@@ -429,17 +332,7 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
public TransactionPendingAckInternalStats
getPendingAckInternalStats(String topic,
String subName,
boolean metadata) throws PulsarAdminException {
- try {
- return getPendingAckInternalStatsAsync(topic, subName, metadata)
- .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw (PulsarAdminException) e.getCause();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarAdminException(e);
- } catch (TimeoutException e) {
- throw new PulsarAdminException.TimeoutException(e);
- }
+ return sync(() -> getPendingAckInternalStatsAsync(topic, subName,
metadata));
}
}