cbornet opened a new issue, #16739:
URL: https://github.com/apache/pulsar/issues/16739
## Motivation
Pulsar IO connectors make it possible to connect Pulsar to an external
system:
* A Source reads continuously from an external system and writes to a Pulsar
topic
* A Sink reads continuously from a Pulsar topic and writes to an external
system.
Sources and Sinks are written in Java.
Pulsar also has a lightweight computing system named Pulsar Functions. A
Pulsar Function reads from one or more topics, applies user logic written in
Java, Python or Go and writes to an output topic.
When using Pulsar IO connectors, the format of what is read/written from/to
the source/sink is defined by the connector code. But there are a lot of
situations where a user wants to transform this data before using it. Currently
the solution is to either :
* write a custom connector that transforms the data the way we want but that
means writing a lot of code without reuse, packaging and managing custom
connectors and so on..
* write a Function to transform the data after it was written to a topic by
a Source or before it is read from a topic by a Sink. This is not very
efficient as we have to use an intermediate topic, which means additional
storage, IO, and latency.
Considering all this, it would be handy to be able to apply a Function
on-the-fly to a connector without going through an intermediary topic.
## Goal
This PIP defines the changes needed to be able to apply a preprocessing
Function on-the-fly to a Sink.
The preprocessing function can be a built-in function, a package function,
or loaded through an http URL or a file path.
Sources, Sinks and Functions are based on the same runtime process that:
* reads from a Source. For Sinks and Functions this Source is a PulsarSource
consuming from a Pulsar topic
* applies a Function. For Sources and Sinks, this Function is
IdentityFunction which returns the data it gets without modification.
* writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
writing to a Pulsar topic.
This PIP reuses this and allows configuring a Function different from
IdentityFunction to Sinks.
Only Functions returning a Record will be authorized to ensure that the
Function sets the Schema explicitly.
Out of the scope of this PIP, for future work:
* Applying a post-processing Function to a Source
* Loading the Function jar through the Sink CLI
## API Changes
### Admin CLI
The following options will be added to the `pulsar-admin sinks` CLI
`create`, `update` and `localrun`:
* `preprocess-function`: the preprocess function applied before the Sink.
Starts by `builtin://` for built-in functions, `function://` for package
function, `http://` or `file://`
* `preprocess-function-classname`: the preprocess function class name
(optional if the function is a NAR)
* `preprocess-function-config`: the configuration of the preprocess function
in the same format as the `user-config` parameter of the `functions create` CLI
command.
The corresponding fields will be added to `SinkConfig`:
```java
private String preprocessFunction;
private String preprocessFunctionClassName;
private String preprocessFunctionConfig;
```
### Function definition
The field `extraFunctionPackageLocation` to the protobuf structure
`FunctionMetaData` will be added. This field will be filled with the location
of the extra function to apply when registering a sink and used in the Runtime
to load the function code.
```protobuf
message FunctionMetaData {
...
PackageLocationMetaData extraFunctionPackageLocation = 7;
}
```
### Runtime
The parameters `extraFunctionFile` and `originalExtraFunctionFileName` will
be added to `RuntimeFactory::createContainer`
```java
Runtime createContainer(
InstanceConfig instanceConfig, String codeFile, String
originalCodeFileName,
String extraFunctionFile, String originalExtraFunctionFileName,
Long expectedHealthCheckInterval) throws Exception;
```
### Instance function cache
A field `extraFunctionId` to `InstanceConfig` that will hold the UUID cache
key of the extra function will be added.
```java
public class InstanceConfig {
private int instanceId;
private String functionId;
private String extraFunctionId;
```
### JavaInstanceStarter
The following parameters will be added to JavaInstanceStarter:
* `--extra_function_jar`: the path to the extra function jar
* `--extra_function_id`: the extra function UUID cache key
These parameters are then used by the `ThreadRuntime` to load the function
from the `FunctionCacheManager` or create it there if needed.
### Download the extra function
The statefulset spawned in `KubernetesRuntime` needs to be able to download
the extra functions code via API.
An `extra-function` query param will be added to the download function HTTP
endpoint
```java
@Path("/{tenant}/{namespace}/{functionName}/download")
public StreamingOutput downloadFunction(
@ApiParam(value = "The tenant of functions")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of functions")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of functions")
final @PathParam("functionName") String functionName) {
final @PathParam("functionName") String functionName,
@ApiParam(value = "Whether to download the extra-function")
final @QueryParam("extra-function") boolean extraFunction) {
```
If `extraFunction` is `true` then the extra function will be returned
instead of the sink.
The Java admin SDK will have the following methods added:
```java
/**
* Download Function Code.
*
* @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param extraFunction
* Whether to download the extra-function (for sources and
sinks)
* @throws PulsarAdminException
*/
void downloadFunction(String destinationFile, String tenant, String
namespace, String function,
boolean extraFunction) throws PulsarAdminException;
/**
* Download Function Code asynchronously.
*
* @param destinationFile
* file where data should be downloaded to
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
* @param extraFunction
* Whether to download the extra-function (for sources and
sinks)
*/
CompletableFuture<Void> downloadFunctionAsync(
String destinationFile, String tenant, String namespace, String
function, boolean extraFunction);
```
The parameter `--extra-function` will be added to the admin CLI command
`functions download`
## Implementation
### Pulsar-admin
* Add the admin CLI options when creating/updating/localrunning the sink
(see API changes)
### Pulsar broker
* On the broker API, in registerSink/updateSink, if a preprocessing function
is present in the Sink config, we:
* validate the function
* get the function classloader (from builtin or download a package file)
* load the function
* inspect the function types and set the first arg as Sink type. Also
verify that the second arg is of type Record.
* use the function classloader instead of the sink classloader to verify
if custom schemas, serdes, crypto key readers can be loaded and are conform.
* get the function package location and fill the protobuf
extraFunctionPackageLocation field with it. A name for this preprocessing
function is generated from the sink name so it can be referenced when stored in
BookKeeper or in package management. The name of the preprocessing function is
`{sink name}__sink-function`.
* set the `functionDetails` with the preprocessing function config
(function class name and function userConfig)
* The `--extra-function` query parameter is added to the `functions
download` CLI command, admin SDK and HTTP API (see API changes).
### Function worker
* When the `InstanceConfig` is created, an UUID is set to the
`extraFunctionId` field. This field will serve as a cache key for the extra
function (see API changes).
* When the `FunctionActioner` starts the function, if
`extraFunctionPackageLocation` is present, the same is done for the extra
function as what is done for the connector:
* if the runtime is not externally managed, the extra function code is
downloaded from the `extraFunctionPackageLocation` and the `Runtime` is created
with the extra package file path and original name (see API changes to
`RuntimeFactory::createContainer`)
* if the runtime is externally managed, the `Runtime` is created with the
`extraFunctionPackageLocation` and original name.
* Depending on the configured runtime, if there’s an extra function file:
* For the `ThreadRuntime`, the extra function classloader is obtained with
the instance `extraFunctionId` cache key, then this classloader is passed to
the `JavaInstanceRunnable`. The `JavaInstanceRunnable` then switches between
the connector classloader and the extra function classloader accordingly..
* For the `ProcessRuntime`, the path to the extra function jar is added to
the `--extra_function_jar` parameter in the `JavaInstanceStarter` command. The
`JavaInstanceStarter` then uses it when creating its `ThreadRuntime`.
* For the `KubernetesRuntime`, a command is added in the statefulset exec
command to download the extra function using the `–extra-function` flag of the
`functions download` command. And the path to this downloaded jar is added to
the `--extra_function_jar` parameter of the `JavaInstanceStarter` command.
### LocalRunner
If `sinkConfig` has a `preprocessFunction`, the `LocalRunner` will use the
same methods as in the broker to get the function file and `functionDetails`
and use them when spawning the `Runtime`.
## Reject Alternatives
N/A
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]