jiangpengcheng commented on code in PR #20455: URL: https://github.com/apache/pulsar/pull/20455#discussion_r1220665995
########## pip/pip-272.md: ########## @@ -0,0 +1,214 @@ +<!-- +RULES +* Never place a link to an external site like Google Doc. The proposal should be in this issue entirely. +* Use a spelling and grammar checker tools if available for you (there are plenty of free ones). + +PROPOSAL HEALTH CHECK +I can read the design document and understand the problem statement and what you plan to change *without* resorting to a couple of hours of code reading just to start having a high level understanding of the change. + +THIS COMMENTS +Please remove them when done. +--> + +# Background knowledge + +<!-- +Describes all the knowledge you need to know in order to understand all the other sections in this PIP + +* Give a high level explanation on all concepts you will be using throughout this document. For example, if you want to talk about Persistent Subscriptions, explain briefly (1 paragraph) what this is. If you're going to talk about Transaction Buffer, explain briefly what this is. + If you're going to change something specific, then go into more detail about it and how it works. +* Provide links where possible if a person wants to dig deeper into the background information. + +DON'T +* Do not include links *instead* explanation. Do provide links for further explanation. + +EXAMPLES +* See [PIP-248](https://github.com/apache/pulsar/issues/19601), Background section to get an understanding on how you add the background knowledge needed. + (They also included the motivation there, but ignore it as we place that in Motivation section explicitly). +--> + +In Pulsar, a pulsar function support storing state, such as a `WordCount` function which stores the state of its counters. + +```python +from pulsar import Function + +class WordCount(Function): + def process(self, item, context): + for word in item.split(): + context.incr_counter(word, 1) +``` + +Currently, Pulsar uses Bookkeeper as the default state storage interface. We can also use other state stores, which can be configured in the `conf/functions_worker.yml` using the field: `stateStorageProviderImplementation`, this YAML file will be parsed and loaded in Pulsar as the `WorkerConfig`. + +The `WorkerConfig` is used to configure the Pulsar functions worker and has two fields related to the state store: + +1. `stateStorageProviderImplementation`: The implementation class for the state store which should implement the interface`StateStoreProvider`, such as `org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl` + +2. `stateStorageServiceUrl`: The service URL of state storage, such as: `bk://localhost:4181` + +`Runtime` and `RuntimeFactory`: + +Pulsar Function supports three kinds of runtime and, correspondingly, has three related `RuntimeFactory` to create them, + +1. ThreadRuntime: Each instance runs as a thread; it will load the `JavaInstanceRunnable` + +2. ProcessRuntime: Each instance runs as a process; it will start a `ProcessBuilder` with commands to start `JavaInstanceStarter`, the `JavaInstanceStarter` will load the `ThreadRuntime` and finally load the `JavaInstanceRunnable`, this is for `Java` function, for other languages, the commands will start related "instance" + +3. KubernetesRubtime: Function is submitted as Kubernetes StatefulSet by workers and each function instance runs as a pod; similar to the process runtime, the StatefulSet's command will start the `JavaInstanceStarter` and finally load the `JavaInstanceRunnable`, for other languages, the command will start related "instance" + + +# Motivation + +<!-- +Describe the problem this proposal is trying to solve. + +* Explain what is the problem you're trying to solve - current situation. +* This section is the "Why" of your proposal. +--> + +Pulsar functions don't support configuring the `StateStoreProviderImplClass` with extra configurations, although there is a `Map<String, Object> config` parameter in the `StateStoreProvider#init` method, it's initialized as a map with only one field `stateStorageServiceUrl` in `javaInstanceRunnable` + +see: + +https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L366-L369 + +Then when using a custom `StateStoreProvider`, it cannot get other configurations from the `init` method. + +# Goals + +## In Scope + +<!-- +What this PIP intend to achieve once It's integrated into Pulsar. +Why does it benefit Pulsar. +--> + +Make `StateStoreProvider` "truly" configurable, custom `StateStoreProvider` can be configured more easily and explicitly. + +## Out of Scope + +<!-- +Describe what you have decided to keep out of scope, perhaps left for a different PIP/s. +--> + + +# High Level Design + +<!-- +Describe the design of your solution in *high level*. +Describe the solution end to end, from a birds-eye view. +Don't go into implementation details in this section. + +I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and +how you intend to solve it, end to end. + +DON'T +* Avoid code snippets, unless it's essential to explain your intent. +--> + +Make `StateStoreProvider` configurable using custom configurations. + +# Detailed Design + +## Design & Implementation Details + +<!-- +This is the section where you dive into the details. It can be: +* Concrete class names and their roles and responsibility, including methods. +* Code snippets of existing code. +* Interface names and its methods. +* ... +--> + +1. Add a `stateStorageConfig` to the `WorkerConfig` +2. In all three runtime factories, pass this config to the created runtime +3. Add a new cli argument to `JavaInstanceStarter` and `LocalRunner` so process&k8s runtime can pass state related config to them +4. Add new state config parameter to `JavaInstanceRunnable` to accept the state provider related configurations + +## Public-facing Changes + +<!-- +Describe the additions you plan to make for each public facing component. +Remove the sections you are not changing. +Clearly mark any changes which are BREAKING backward compatability. +--> + +### Public API +<!-- +When adding a new endpoint to the REST API, please make sure to document the following: + +* path +* query parameters +* HTTP body parameters, usually as JSON. +* Response codes, and for each what they mean. + For each response code, please include a detailed description of the response body JSON, specifying each field and what it means. + This is the place to document the errors. +--> + +### Binary protocol + +### Configuration + +- A new `map<String, Object>` field like `stateStorageConfig` is added to `WorkerConfig` Review Comment: No, the `WorkerConfig` is a Java class, so this field is a `map<String, Object>` instead of a JSON string, there are already other `map<String, Object>` fields in the `WorkerConfig`, such as `functionRuntimeFactoryConfigs`, `runtimeCustomizerConfig` and `functionsWorkerServiceCustomConfigs`. On users(operator) side, they just need to edit the `functions_worker.yml` to provide the `stateStorageConfig`, which will be parsed and loaded as the `WorkerConfig`; an example looks like the below: ```yaml stateStorageConfig: username: xxx password: xxx ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
