JackrayWang commented on issue #14536:
URL: https://github.com/apache/pulsar/issues/14536#issuecomment-1056811420
I refer to the URL request constructed by the following code.
```
@POST
@ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have
admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (The Pulsar
Function already exists, etc.)"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "Pulsar Function successfully
created")
})
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void registerFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition
fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(
value = "You can submit a function (in any languages
that you are familiar with) \n"
+ "to a Pulsar cluster. Follow the steps below.
\n"
+ "1. Create a JSON object using some of the
following parameters.\n"
+ "A JSON value presenting configuration payload
of a Pulsar Function.\n"
+ " An example of the expected Pulsar Function
can be found here.\n"
+ "- **autoAck**\n"
+ " Whether or not the framework acknowledges
messages automatically.\n"
+ "- **runtime**\n"
+ " What is the runtime of the Pulsar Function.
Possible Values: [JAVA, PYTHON, GO]\n"
+ "- **resources**\n"
+ " The size of the system resources allowed by
the Pulsar Function runtime."
+ " The resources include: cpu, ram, disk.\n"
+ "- **className**\n"
+ " The class name of a Pulsar Function.\n"
+ "- **customSchemaInputs**\n"
+ " The map of input topics to Schema class
names (specified as a JSON object).\n"
+ "- **customSerdeInputs**\n"
+ " The map of input topics to SerDe class
names (specified as a JSON object).\n"
+ "- **deadLetterTopic**\n"
+ " Messages that are not processed
successfully are sent to `deadLetterTopic`.\n"
+ "- **runtimeFlags**\n"
+ " Any flags that you want to pass to the
runtime."
+ " Note that in thread mode, these flags have
no impact.\n"
+ "- **fqfn**\n"
+ " The Fully Qualified Function Name (FQFN)
for the Pulsar Function.\n"
+ "- **inputSpecs**\n"
+ " The map of input topics to its consumer
configuration,"
+ " each configuration has schema of "
+ " {\"schemaType\": \"type-x\",
\"serdeClassName\": \"name-x\","
+ " \"isRegexPattern\": true,
\"receiverQueueSize\": 5}\n"
+ "- **inputs**\n"
+ " The input topic or topics (multiple topics
can be specified as"
+ " a comma-separated list) of a Pulsar
Function.\n"
+ "- **jar**\n"
+ " Path to the JAR file for the Pulsar
Function"
+ " (if the Pulsar Function is written in Java).
"
+ " It also supports URL path [http/https/file
(file protocol assumes that file "
+ " already exists on worker host)] from which
worker can download the package.\n"
+ "- **py**\n"
+ " Path to the main Python file or Python
wheel file for the"
+ " Pulsar Function (if the Pulsar Function is
written in Python).\n"
+ "- **go**\n"
+ " Path to the main Go executable binary for
the Pulsar Function"
+ " (if the Pulsar Function is written in Go).\n"
+ "- **logTopic**\n"
+ " The topic to which the logs of a Pulsar
Function are produced.\n"
+ "- **maxMessageRetries**\n"
+ " How many times should we try to process a
message before giving up.\n"
+ "- **output**\n"
+ " The output topic of a Pulsar Function"
+ " (If none is specified, no output is
written).\n"
+ "- **outputSerdeClassName**\n"
+ " The SerDe class to be used for messages
output by the Pulsar Function.\n"
+ "- **parallelism**\n"
+ " The parallelism factor of a Pulsar Function"
+ " (i.e. the number of a Pulsar Function
instances to run).\n"
+ "- **processingGuarantees**\n"
+ " The processing guarantees (that is,
delivery semantics)"
+ " applied to the Pulsar Function."
+ " Possible Values: [ATLEAST_ONCE,
ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ "- **retainOrdering**\n"
+ " Function consumes and processes messages in
order.\n"
+ "- **outputSchemaType**\n"
+ " Represents either a builtin schema type
(for example: 'avro', 'json', ect)"
+ " or the class name for a Schema
implementation."
+ "- **subName**\n"
+ " Pulsar source subscription name. User can
specify a subscription-name"
+ " for the input-topic consumer.\n"
+ "- **windowConfig**\n"
+ " The window configuration of a Pulsar
Function.\n"
+ "- **timeoutMs**\n"
+ " The message timeout in milliseconds.\n"
+ "- **topicsPattern**\n"
+ " The topic pattern to consume from a list of
topics under a namespace"
+ " that match the pattern."
+ " [input] and [topic-pattern] are mutually
exclusive. Add SerDe class name for a "
+ " pattern in customSerdeInputs (supported for
java fun only)\n"
+ "- **userConfig**\n"
+ " A map of user-defined configurations
(specified as a JSON object).\n"
+ "- **secrets**\n"
+ " This is a map of secretName(that is how the
secret is going to be accessed"
+ " in the Pulsar Function via context) to an
object that"
+ " encapsulates how the secret is fetched by
the underlying secrets provider."
+ " The type of an value here can be found by
the"
+ "
SecretProviderConfigurator.getSecretObjectType() method. \n"
+ "- **cleanupSubscription**\n"
+ " Whether the subscriptions of a Pulsar
Function created or used should be deleted"
+ " when the Pulsar Function is deleted.\n"
+ "2. Encapsulate the JSON object to a multipart
object.",
examples = @Example(
value = {
@ExampleProperty(
mediaType = MediaType.TEXT_PLAIN,
value = " Example \n"
+ "\n"
+ " 1. Create a JSON object. \n"
+ "\n"
+ "{\n"
+ "\t\"inputs\":
\"persistent://public/default/input-topic\",\n"
+ "\t\"parallelism\": \"4\",\n"
+ "\t\"output\":
\"persistent://public/default/output-topic\",\n"
+ "\t\"log-topic\":
\"persistent://public/default/log-topic\",\n"
+ "\t\"classname\":
\"org.example.test.ExclamationFunction\",\n"
+ "\t\"jar\":
\"java-function-1.0-SNAPSHOT.jar\"\n"
+ "}\n"
+ "\n"
+ "\n"
+ "2. Encapsulate the JSON
object to a multipart object (in Python). \n"
+ "\n"
+ "from
requests_toolbelt.multipart.encoder import MultipartEncoder \n"
+ "mp_encoder =
MultipartEncoder( \n"
+ "\t[('functionConfig', "
+ "(None, json.dumps(config),
'application/json'))])\n"
)
}
)
)
final @FormDataParam("functionConfig") FunctionConfig
functionConfig) {
functions().registerFunction(tenant, namespace, functionName,
uploadedInputStream, fileDetail,
functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]