This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 75d43eb [PulsarAdmin] Functions to async (#6580) 75d43eb is described below commit 75d43ebb8368920a3a7b3c51d26f1bb59c66da28 Author: Yijie Shen <henry.yijies...@gmail.com> AuthorDate: Mon Mar 23 13:12:16 2020 +0800 [PulsarAdmin] Functions to async (#6580) --- .../org/apache/pulsar/client/admin/Functions.java | 375 ++++++++++- .../client/admin/internal/FunctionsImpl.java | 737 ++++++++++++++++----- 2 files changed, 942 insertions(+), 170 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index d6a7ff3..50b7bb5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -54,6 +55,20 @@ public interface Functions { List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException; /** + * Get the list of functions asynchronously. + * <p> + * Get the list of all the Pulsar functions. + * <p> + * Response Example: + * + * <pre> + * <code>["f1", "f2", "f3"]</code> + * </pre> + * + */ + CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace); + + /** * Get the configuration for the specified function. * <p> * Response Example: @@ -81,6 +96,27 @@ public interface Functions { FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** + * Get the configuration for the specified function asynchronously. + * <p> + * Response Example: + * + * <pre> + * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code> + * </pre> + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @return the function configuration + * + */ + CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function); + + /** * Create a new function. * * @param functionConfig @@ -92,12 +128,20 @@ public interface Functions { void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException; /** - * <pre> + * Create a new function asynchronously. + * + * @param functionConfig + * the function configuration object + */ + CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName); + + /** + * Create a new function with package url. + * <p> * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file * eg: * File: file:/dir/fileName.jar * Http: http://www.repo.com/fileName.jar - * </pre> * * @param functionConfig * the function configuration object @@ -108,6 +152,21 @@ public interface Functions { void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException; /** + * Create a new function with package url asynchronously. + * <p> + * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * + * @param functionConfig + * the function configuration object + * @param pkgUrl + * url from which pkg can be downloaded + */ + CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl); + + /** * Update the configuration for a function. * <p> * @@ -124,6 +183,15 @@ public interface Functions { void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException; /** + * Update the configuration for a function asynchronously. + * <p> + * + * @param functionConfig + * the function configuration object + */ + CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName); + + /** * Update the configuration for a function. * <p> * @@ -141,13 +209,23 @@ public interface Functions { void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException; /** + * Update the configuration for a function asynchronously. + * <p> + * + * @param functionConfig + * the function configuration object + * @param updateOptions + * options for the update operations + */ + CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions); + + /** * Update the configuration for a function. - * <pre> + * <p> * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file * eg: * File: file:/dir/fileName.jar * Http: http://www.repo.com/fileName.jar - * </pre> * * @param functionConfig * the function configuration object @@ -163,13 +241,27 @@ public interface Functions { void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException; /** + * Update the configuration for a function asynchronously. + * <p> + * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * + * @param functionConfig + * the function configuration object + * @param pkgUrl + * url from which pkg can be downloaded + */ + CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl); + + /** * Update the configuration for a function. - * <pre> + * <p> * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file * eg: * File: file:/dir/fileName.jar * Http: http://www.repo.com/fileName.jar - * </pre> * * @param functionConfig * the function configuration object @@ -186,9 +278,25 @@ public interface Functions { */ void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException; + /** + * Update the configuration for a function asynchronously. + * <p> + * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file + * eg: + * File: file:/dir/fileName.jar + * Http: http://www.repo.com/fileName.jar + * + * @param functionConfig + * the function configuration object + * @param pkgUrl + * url from which pkg can be downloaded + * @param updateOptions + * options for the update operations + */ + CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions); /** - * Delete an existing function + * Delete an existing function. * <p> * Delete a function * @@ -211,6 +319,20 @@ public interface Functions { void deleteFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** + * Delete an existing function asynchronously. + * <p> + * Delete a function + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function); + + /** * Gets the current status of a function. * * @param tenant @@ -226,6 +348,18 @@ public interface Functions { FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException; /** + * Gets the current status of a function asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function); + + /** * Gets the current status of a function instance. * * @param tenant @@ -243,6 +377,21 @@ public interface Functions { throws PulsarAdminException; /** + * Gets the current status of a function instance asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param id + * Function instance-id + * @return + */ + CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id); + + /** * Gets the current stats of a function instance. * * @param tenant @@ -260,6 +409,21 @@ public interface Functions { throws PulsarAdminException; /** + * Gets the current stats of a function instance asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param id + * Function instance-id + * @return + */ + CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id); + + /** * Gets the current stats of a function. * * @param tenant @@ -276,7 +440,21 @@ public interface Functions { throws PulsarAdminException; /** - * Restart function instance + * Gets the current stats of a function asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @return + */ + + CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function); + + /** + * Restart function instance. * * @param tenant * Tenant name @@ -294,7 +472,22 @@ public interface Functions { void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; /** - * Restart all function instances + * Restart function instance asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + */ + CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId); + + /** + * Restart all function instances. * * @param tenant * Tenant name @@ -308,9 +501,20 @@ public interface Functions { */ void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException; + /** + * Restart all function instances asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function); /** - * Stop function instance + * Stop function instance. * * @param tenant * Tenant name @@ -328,7 +532,22 @@ public interface Functions { void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; /** - * Start all function instances + * Stop function instance asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + */ + CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId); + + /** + * Start all function instances. * * @param tenant * Tenant name @@ -343,7 +562,19 @@ public interface Functions { void startFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** - * Start function instance + * Start all function instances asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function); + + /** + * Start function instance. * * @param tenant * Tenant name @@ -361,7 +592,22 @@ public interface Functions { void startFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; /** - * Stop all function instances + * Start function instance asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + */ + CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId); + + /** + * Stop all function instances. * * @param tenant * Tenant name @@ -375,6 +621,17 @@ public interface Functions { */ void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException; + /** + * Stop all function instances asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function); /** * Triggers the function by writing to the input topic. @@ -396,6 +653,22 @@ public interface Functions { String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException; /** + * Triggers the function by writing to the input topic asynchronously. + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param triggerValue + * The input that will be written to input topic + * @param triggerFile + * The file which contains the input that will be written to input topic + */ + CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile); + + /** * Upload Data. * * @param sourceFile @@ -409,6 +682,16 @@ public interface Functions { void uploadFunction(String sourceFile, String path) throws PulsarAdminException; /** + * Upload Data asynchronously. + * + * @param sourceFile + * dataFile that needs to be uploaded + * @param path + * Path where data should be stored + */ + CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path); + + /** * Download Function Code. * * @param destinationFile @@ -425,6 +708,16 @@ public interface Functions { * Download Function Code. * * @param destinationFile + * file where data should be downloaded to + * @param path + * Path where data is located + */ + CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path); + + /** + * Download Function Code. + * + * @param destinationFile * file where data should be downloaded to * @param tenant * Tenant name @@ -437,6 +730,20 @@ public interface Functions { void downloadFunction(String destinationFile, String tenant, String namespace, String function) throws PulsarAdminException; /** + * Download Function Code asynchronously. + * + * @param destinationFile + * file where data should be downloaded to + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + */ + CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String tenant, String namespace, String function); + + /** * Deprecated in favor of getting sources and sinks for their own APIs * * Fetches a list of supported Pulsar IO connectors currently running in cluster mode @@ -502,6 +809,28 @@ public interface Functions { FunctionState getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException; /** + * Fetch the current state associated with a Pulsar Function asynchronously. + * <p> + * Response Example: + * + * <pre> + * <code>{ "value : 12, version : 2"}</code> + * </pre> + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param key + * Key name of State + * + * @return the function configuration + */ + CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key); + + /** * Puts the given state associated with a Pulsar Function. * <p> * Response Example: @@ -527,4 +856,24 @@ public interface Functions { * Unexpected error */ void putFunctionState(String tenant, String namespace, String function, FunctionState state) throws PulsarAdminException; + + /** + * Puts the given state associated with a Pulsar Function asynchronously. + * <p> + * Response Example: + * + * <pre> + * <code>{ "value : 12, version : 2"}</code> + * </pre> + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * @param state + * FunctionState + */ + CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index bc2982d..a5b323c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.admin.internal; import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import io.netty.handler.codec.http.HttpHeaders; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -47,6 +46,7 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; import javax.ws.rs.client.Entity; +import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; @@ -56,7 +56,10 @@ import java.io.FileOutputStream; import java.nio.channels.FileChannel; import java.util.List; import java.util.Set; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.asynchttpclient.Dsl.get; @@ -78,188 +81,408 @@ public class FunctionsImpl extends ComponentResource implements Functions { @Override public List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException { try { - Response response = request(functions.path(tenant).path(namespace)).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(new GenericType<List<String>>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getFunctionsAsync(tenant, namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace) { + WebTarget path = functions.path(tenant).path(namespace); + final CompletableFuture<List<String>> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + List<String> functions + = response.readEntity(new GenericType<List<String>>() {}); + future.complete(functions); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException { try { - Response response = request(functions.path(tenant).path(namespace).path(function)).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionConfig.class); - } catch (Exception e) { - throw getApiException(e); + return getFunctionAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function); + final CompletableFuture<FunctionConfig> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity(FunctionConfig.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override public FunctionStatus getFunctionStatus( String tenant, String namespace, String function) throws PulsarAdminException { try { - Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionStatus.class); - } catch (Exception e) { - throw getApiException(e); + return getFunctionStatusAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("status"); + final CompletableFuture<FunctionStatus> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity(FunctionStatus.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionStatus( String tenant, String namespace, String function, int id) throws PulsarAdminException { try { - Response response = request( - functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) - .get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class); - } catch (Exception e) { - throw getApiException(e); + return getFunctionStatusAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"); + final CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity( + FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException { try { - Response response = request( - functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class); - } catch (Exception e) { - throw getApiException(e); + return getFunctionStatsAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats"); + final CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity( + FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException { try { - Response response = request( - functions.path(tenant).path(namespace).path(function).path("stats")).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionStats.class); - } catch (Exception e) { - throw getApiException(e); - } } + return getFunctionStatsAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("stats"); + final CompletableFuture<FunctionStats> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity(FunctionStats.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } @Override public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException { try { + createFunctionAsync(functionConfig, fileName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + try { RequestBuilder builder = post(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString()) .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON)); if (fileName != null && !fileName.startsWith("builtin://")) { // If the function code is built in, we don't need to submit here - builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); - } - org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get(); - - if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { - throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()); + builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } + asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { + future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build())); + } else { + future.complete(null); + } + }); } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(e); } + return future; } @Override public void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException { try { - final FormDataMultiPart mp = new FormDataMultiPart(); + createFunctionWithUrlAsync(functionConfig, pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } - mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + @Override + public CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) { + WebTarget path = functions.path(functionConfig.getTenant()) + .path(functionConfig.getNamespace()).path(functionConfig.getName()); - mp.bodyPart(new FormDataBodyPart("functionConfig", - new Gson().toJson(functionConfig), - MediaType.APPLICATION_JSON_TYPE)); - request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName())) - .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); - } + final FormDataMultiPart mp = new FormDataMultiPart(); + mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); + mp.bodyPart(new FormDataBodyPart( + "functionConfig", new Gson().toJson(functionConfig), MediaType.APPLICATION_JSON_TYPE)); + + return asyncPostRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA)); } @Override public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException { try { - request(functions.path(cluster).path(namespace).path(function)) - .delete(ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + deleteFunctionAsync(cluster, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function); + return asyncDeleteRequest(path); + } + + @Override public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException { updateFunction(functionConfig, fileName, null); } - @Override + @Override + public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName) { + return updateFunctionAsync(functionConfig, fileName, null); + } + + @Override public void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException { try { + updateFunctionAsync(functionConfig, fileName, updateOptions) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + try { RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString()) .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON)); if (updateOptions != null) { - builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON)); + builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON)); } if (fileName != null && !fileName.startsWith("builtin://")) { // If the function code is built in, we don't need to submit here builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM)); } - org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get(); - if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { - throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()); - } + asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { + future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build())); + } else { + future.complete(null); + } + }); } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(getApiException(e)); } + return future; } @Override public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException { try { - final FormDataMultiPart mp = new FormDataMultiPart(); + updateFunctionWithUrlAsync(functionConfig, pkgUrl, updateOptions) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override + public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + try { + final FormDataMultiPart mp = new FormDataMultiPart(); mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE)); - mp.bodyPart(new FormDataBodyPart( "functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON_TYPE)); - if (updateOptions != null) { mp.bodyPart(new FormDataBodyPart( "updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON_TYPE)); } - - request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()) - .path(functionConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), - ErrorData.class); + WebTarget path = functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()) + .path(functionConfig.getName()); + return asyncPutRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA)); } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(getApiException(e)); } + return future; } @Override @@ -268,104 +491,229 @@ public class FunctionsImpl extends ComponentResource implements Functions { } @Override + public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) { + return updateFunctionWithUrlAsync(functionConfig, pkgUrl, null); + } + + @Override public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException { try { - final FormDataMultiPart mp = new FormDataMultiPart(); - if (triggerFile != null) { - mp.bodyPart(new FileDataBodyPart("dataStream", - new File(triggerFile), - MediaType.APPLICATION_OCTET_STREAM_TYPE)); - } - if (triggerValue != null) { - mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE)); - } - if (topic != null && !topic.isEmpty()) { - mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE)); - } - return request(functions.path(tenant).path(namespace).path(functionName).path("trigger")) - .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class); - } catch (Exception e) { - throw getApiException(e); + return triggerFunctionAsync(tenant, namespace, functionName, topic, triggerValue, triggerFile) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) { + final FormDataMultiPart mp = new FormDataMultiPart(); + if (triggerFile != null) { + mp.bodyPart(new FileDataBodyPart("dataStream", + new File(triggerFile), + MediaType.APPLICATION_OCTET_STREAM_TYPE)); + } + if (triggerValue != null) { + mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE)); + } + if (topic != null && !topic.isEmpty()) { + mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE)); + } + WebTarget path = functions.path(tenant).path(namespace).path(function).path("trigger"); + + final CompletableFuture<String> future = new CompletableFuture<>(); + try { + request(path).async().post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), new InvocationCallback<String>() { + + @Override + public void completed(String response) { + future.complete(response); + } + + @Override + public void failed(Throwable throwable) { + log.warn("[{}] Failed to perform http post request: {}", path.getUri(), throwable.getMessage()); + future.completeExceptionally(getApiException(throwable.getCause())); + } + + }); + } catch (PulsarAdminException cae) { + future.completeExceptionally(cae); } + return future; } @Override public void restartFunction(String tenant, String namespace, String functionName, int instanceId) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) - .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + restartFunctionAsync(tenant, namespace, functionName, instanceId) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId)) + .path("restart"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path("restart")) - .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + restartFunctionAsync(tenant, namespace, functionName) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("restart"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void stopFunction(String tenant, String namespace, String functionName, int instanceId) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) - .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + stopFunctionAsync(tenant, namespace, functionName, instanceId) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId)) + .path("stop"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void stopFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path("stop")) - .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + stopFunctionAsync(tenant, namespace, functionName) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("stop"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void startFunction(String tenant, String namespace, String functionName, int instanceId) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) - .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + startFunctionAsync(tenant, namespace, functionName, instanceId) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId)) + .path("start"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void startFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { try { - request(functions.path(tenant).path(namespace).path(functionName).path("start")) - .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + startFunctionAsync(tenant, namespace, functionName) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("start"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { + uploadFunctionAsync(sourceFile, path).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + try { RequestBuilder builder = post(functions.path("upload").getUri().toASCIIString()) .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM)) .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN)); - org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get(); - if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { - throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()); - } + asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture() + .thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { + future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build())); + } else { + future.complete(null); + } + }); } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(getApiException(e)); } + return future; } @Override @@ -374,12 +722,35 @@ public class FunctionsImpl extends ComponentResource implements Functions { } @Override + public CompletableFuture<Void> downloadFunctionAsync(String destinationPath, String tenant, String namespace, String functionName) { + return downloadFileAsync(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download")); + } + + @Override public void downloadFunction(String destinationPath, String path) throws PulsarAdminException { downloadFile(destinationPath, functions.path("download").queryParam("path", path)); } + @Override + public CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path) { + return downloadFileAsync(destinationFile, functions.path("download").queryParam("path", path)); + } + private void downloadFile(String destinationPath, WebTarget target) throws PulsarAdminException { - HttpResponseStatus status; + try { + downloadFileAsync(destinationPath, target).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTarget target) { + final CompletableFuture<Void> future = new CompletableFuture<>(); try { File file = new File(destinationPath); if (!file.exists()) { @@ -389,7 +760,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { RequestBuilder builder = get(target.getUri().toASCIIString()); - Future<HttpResponseStatus> whenStatusCode + CompletableFuture<HttpResponseStatus> statusFuture = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), new AsyncHandler<HttpResponseStatus>() { private HttpResponseStatus status; @@ -409,7 +780,6 @@ public class FunctionsImpl extends ComponentResource implements Functions { @Override public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { - os.write(bodyPart.getBodyByteBuffer()); return State.CONTINUE; } @@ -422,17 +792,27 @@ public class FunctionsImpl extends ComponentResource implements Functions { @Override public void onThrowable(Throwable t) { } - }); - - status = whenStatusCode.get(); - os.close(); + }).toCompletableFuture(); + + statusFuture.thenAccept(status -> { + try { + os.close(); + } catch (Exception e) { + future.completeExceptionally(getApiException(e)); + return; + } - if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { - throw getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build()); - } + if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) { + future.completeExceptionally( + getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build())); + } else { + future.complete(null); + } + }); } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(getApiException(e)); } + return future; } @Override @@ -474,30 +854,73 @@ public class FunctionsImpl extends ComponentResource implements Functions { public FunctionState getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException { try { - Response response = request(functions.path(tenant) - .path(namespace).path(function).path("state").path(key)).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw getApiException(response); - } - return response.readEntity(FunctionState.class); - } catch (Exception e) { - throw getApiException(e); + return getFunctionStateAsync(tenant, namespace, function, key) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } @Override + public CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key) { + WebTarget path = functions.path(tenant).path(namespace).path(function).path("state").path(key); + final CompletableFuture<FunctionState> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + if (!response.getStatusInfo().equals(Response.Status.OK)) { + future.completeExceptionally(getApiException(response)); + } else { + future.complete(response.readEntity(FunctionState.class)); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override public void putFunctionState(String tenant, String namespace, String function, FunctionState state) throws PulsarAdminException { try { - RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString()); - builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON)); - org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get(); + putFunctionStateAsync(tenant, namespace, function, state).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + try { + RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString()); + builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON)); + asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> { + if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { + future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build())); + } else { + future.complete(null); + } + }); - if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) { - throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()); - } } catch (Exception e) { - throw getApiException(e); + future.completeExceptionally(e); } + return future; } }