jiangpengcheng commented on code in PR #20455: URL: https://github.com/apache/pulsar/pull/20455#discussion_r1220693103
########## pip/pip-272.md: ########## @@ -0,0 +1,214 @@ +<!-- +RULES +* Never place a link to an external site like Google Doc. The proposal should be in this issue entirely. +* Use a spelling and grammar checker tools if available for you (there are plenty of free ones). + +PROPOSAL HEALTH CHECK +I can read the design document and understand the problem statement and what you plan to change *without* resorting to a couple of hours of code reading just to start having a high level understanding of the change. + +THIS COMMENTS +Please remove them when done. +--> + +# Background knowledge + +<!-- +Describes all the knowledge you need to know in order to understand all the other sections in this PIP + +* Give a high level explanation on all concepts you will be using throughout this document. For example, if you want to talk about Persistent Subscriptions, explain briefly (1 paragraph) what this is. If you're going to talk about Transaction Buffer, explain briefly what this is. + If you're going to change something specific, then go into more detail about it and how it works. +* Provide links where possible if a person wants to dig deeper into the background information. + +DON'T +* Do not include links *instead* explanation. Do provide links for further explanation. + +EXAMPLES +* See [PIP-248](https://github.com/apache/pulsar/issues/19601), Background section to get an understanding on how you add the background knowledge needed. + (They also included the motivation there, but ignore it as we place that in Motivation section explicitly). +--> + +In Pulsar, a pulsar function support storing state, such as a `WordCount` function which stores the state of its counters. + +```python +from pulsar import Function + +class WordCount(Function): + def process(self, item, context): + for word in item.split(): + context.incr_counter(word, 1) +``` + +Currently, Pulsar uses Bookkeeper as the default state storage interface. We can also use other state stores, which can be configured in the `conf/functions_worker.yml` using the field: `stateStorageProviderImplementation`, this YAML file will be parsed and loaded in Pulsar as the `WorkerConfig`. + +The `WorkerConfig` is used to configure the Pulsar functions worker and has two fields related to the state store: + +1. `stateStorageProviderImplementation`: The implementation class for the state store which should implement the interface`StateStoreProvider`, such as `org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl` + +2. `stateStorageServiceUrl`: The service URL of state storage, such as: `bk://localhost:4181` + +`Runtime` and `RuntimeFactory`: + +Pulsar Function supports three kinds of runtime and, correspondingly, has three related `RuntimeFactory` to create them, + +1. ThreadRuntime: Each instance runs as a thread; it will load the `JavaInstanceRunnable` + +2. ProcessRuntime: Each instance runs as a process; it will start a `ProcessBuilder` with commands to start `JavaInstanceStarter`, the `JavaInstanceStarter` will load the `ThreadRuntime` and finally load the `JavaInstanceRunnable`, this is for `Java` function, for other languages, the commands will start related "instance" + +3. KubernetesRubtime: Function is submitted as Kubernetes StatefulSet by workers and each function instance runs as a pod; similar to the process runtime, the StatefulSet's command will start the `JavaInstanceStarter` and finally load the `JavaInstanceRunnable`, for other languages, the command will start related "instance" + + +# Motivation + +<!-- +Describe the problem this proposal is trying to solve. + +* Explain what is the problem you're trying to solve - current situation. +* This section is the "Why" of your proposal. +--> + +Pulsar functions don't support configuring the `StateStoreProviderImplClass` with extra configurations, although there is a `Map<String, Object> config` parameter in the `StateStoreProvider#init` method, it's initialized as a map with only one field `stateStorageServiceUrl` in `javaInstanceRunnable` + +see: + +https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L366-L369 + +Then when using a custom `StateStoreProvider`, it cannot get other configurations from the `init` method. + +# Goals + +## In Scope + +<!-- +What this PIP intend to achieve once It's integrated into Pulsar. +Why does it benefit Pulsar. +--> + +Make `StateStoreProvider` "truly" configurable, custom `StateStoreProvider` can be configured more easily and explicitly. + +## Out of Scope + +<!-- +Describe what you have decided to keep out of scope, perhaps left for a different PIP/s. +--> + + +# High Level Design + +<!-- +Describe the design of your solution in *high level*. +Describe the solution end to end, from a birds-eye view. +Don't go into implementation details in this section. + +I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and +how you intend to solve it, end to end. + +DON'T +* Avoid code snippets, unless it's essential to explain your intent. +--> + +Make `StateStoreProvider` configurable using custom configurations. Review Comment: Thanks for the improvements! And as commented above, the `stateStorageConfig` is a `Map<String, Object>` rather than a JSON string. For a custom `StateStoreProvider`, unlike other Pulsar plugins, what fields it needs for initialization is arbitrary, so we can only provide a `Map<String, Object>` to it and make the operator decide what fields to provide in the `functions_worker.yml`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
