This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34eb74d37b6 Add CLI command to get available built-in functions
(#16822)
34eb74d37b6 is described below
commit 34eb74d37b69ef4320bd3f7d3bd323d8fda9564a
Author: Christophe Bornet <[email protected]>
AuthorDate: Sun Aug 28 07:58:29 2022 +0200
Add CLI command to get available built-in functions (#16822)
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 18 +++++++++++++
.../org/apache/pulsar/client/admin/Functions.java | 14 ++++++++++
.../common/functions/FunctionDefinition.java | 0
.../client/admin/internal/FunctionsImpl.java | 30 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 15 +++++++++++
.../pulsar/functions/worker/FunctionsManager.java | 8 ++++++
.../functions/worker/rest/api/FunctionsImpl.java | 14 ++++++++++
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 18 +++++++++++++
.../functions/worker/service/api/Functions.java | 6 ++++-
9 files changed, 122 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f43e8227cfc..d5380407491 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -41,6 +41,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -748,6 +749,23 @@ public class FunctionsBase extends AdminResource {
functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
}
+ @GET
+ @ApiOperation(
+ value = "Fetches the list of built-in Pulsar functions",
+ response = FunctionDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/builtins")
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<FunctionDefinition> getBuiltinFunction() {
+ return functions().getBuiltinFunctions(clientAppId(),
clientAuthData());
+ }
+
@PUT
@ApiOperation(value = "Updates a Pulsar Function on the worker leader",
hidden = true)
@ApiResponses(value = {
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
index bbbfcb85268..5bf289f9b65 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -25,6 +25,7 @@ import
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -792,6 +793,19 @@ public interface Functions {
@Deprecated
Set<String> getSinks() throws PulsarAdminException;
+ /**
+ * Fetches a list of supported Pulsar Functions currently running in
cluster mode.
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<FunctionDefinition> getBuiltInFunctions() throws PulsarAdminException;
+
+ /**
+ * Fetches a list of supported Pulsar Functions currently running in
cluster mode asynchronously.
+ */
+ CompletableFuture<List<FunctionDefinition>> getBuiltInFunctionsAsync();
+
/**
* Fetch the current state associated with a Pulsar Function.
* <p/>
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
similarity index 100%
rename from
pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
rename to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 0e96da58432..ff2c22daae5 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
@@ -723,6 +724,35 @@ public class FunctionsImpl extends ComponentResource
implements Functions {
.map(ConnectorDefinition::getName).collect(Collectors.toSet());
}
+ @Override
+ public List<FunctionDefinition> getBuiltInFunctions() throws
PulsarAdminException {
+ return sync(this::getBuiltInFunctionsAsync);
+ }
+
+ @Override
+ public CompletableFuture<List<FunctionDefinition>>
getBuiltInFunctionsAsync() {
+ WebTarget path = functions.path("builtins");
+ final CompletableFuture<List<FunctionDefinition>> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Response>() {
+ @Override
+ public void completed(Response response) {
+ if (response.getStatus() !=
Response.Status.OK.getStatusCode()) {
+
future.completeExceptionally(getApiException(response));
+ } else {
+ future.complete(response.readEntity(
+ new
GenericType<List<FunctionDefinition>>() {}));
+ }
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
return request(functions.path("cluster")).get(new
GenericType<List<WorkerInfo>>() {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b007106b2a8..a37197ccc28 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -44,6 +44,7 @@ import java.util.function.Supplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.text.WordUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -1245,6 +1246,7 @@ public class CmdFunctions extends CmdBase {
jcommander.addCommand("upload", getUploader());
jcommander.addCommand("download", getDownloader());
jcommander.addCommand("reload", new ReloadBuiltInFunctions());
+ jcommander.addCommand("available-functions", new
ListBuiltInFunctions());
}
@VisibleForTesting
@@ -1334,4 +1336,17 @@ public class CmdFunctions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the list of Pulsar Functions
supported by Pulsar cluster")
+ public class ListBuiltInFunctions extends BaseCommand {
+ @Override
+ void runCmd() throws Exception {
+ getAdmin().functions().getBuiltInFunctions()
+ .forEach(function -> {
+ System.out.println(function.getName());
+
System.out.println(WordUtils.wrap(function.getDescription(), 80));
+
System.out.println("----------------------------------------");
+ });
+ }
+ }
+
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
index 0f1c0fcc835..5937d09eaa0 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/FunctionsManager.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.functions.worker;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.List;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
@@ -42,6 +45,11 @@ public class FunctionsManager {
return functions.get(functionType).getArchivePath();
}
+ public List<FunctionDefinition> getFunctionDefinitions() {
+ return
functions.values().stream().map(FunctionArchive::getFunctionDefinition)
+ .collect(Collectors.toList());
+ }
+
public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
this.functions =
FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 8847454973e..3e7f0ab7339 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -44,6 +44,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
@@ -787,6 +788,19 @@ public class FunctionsImpl extends ComponentImpl
implements Functions<PulsarWork
worker().getFunctionsManager().reloadFunctions(worker().getWorkerConfig());
}
+ @Override
+ public List<FunctionDefinition> getBuiltinFunctions(String clientRole,
+
AuthenticationDataSource authenticationData) {
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole, authenticationData)) {
+ throw new RestException(Response.Status.UNAUTHORIZED, "Client is
not authorized to perform operation");
+ }
+ return this.worker().getFunctionsManager().getFunctionDefinitions();
+ }
+
private Function.FunctionDetails validateUpdateRequestParams(final String
tenant,
final String
namespace,
final String
componentName,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 9eb02e4f165..3efad1da1cc 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -367,6 +368,23 @@ public class FunctionsApiV3Resource extends
FunctionApiResource {
functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
}
+ @GET
+ @ApiOperation(
+ value = "Fetches the list of built-in Pulsar functions",
+ response = FunctionDefinition.class,
+ responseContainer = "List"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin
permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/builtins")
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<FunctionDefinition> getBuiltinFunctions() {
+ return functions().getBuiltinFunctions(clientAppId(),
clientAuthData());
+ }
+
@GET
@Path("/{tenant}/{namespace}/{functionName}/state/{key}")
public FunctionState getFunctionState(final @PathParam("tenant") String
tenant,
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
index 90d4c1f7a10..e63cb591475 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
@@ -21,9 +21,11 @@ package org.apache.pulsar.functions.worker.service.api;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData;
@@ -161,7 +163,9 @@ public interface Functions<W extends WorkerService> extends
Component<W> {
String clientRole,
AuthenticationDataSource clientAuthenticationDataHttps);
-
void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource
clientAuthenticationDataHttps)
throws IOException;
+
+ List<FunctionDefinition> getBuiltinFunctions(String clientRole,
+ AuthenticationDataSource
clientAuthenticationDataHttps);
}