GitHub user jerrypeng added a comment to the discussion: Pulsar functions should be able to return none and multiple values
@KIC thank you for taking the time to think about how to improve Pulsar Functions! Just to clarify somethings > As I understand the current implementation of pulsar functions is a 1:1 > relation ship. One event in -> one event out. This is very limiting as one > can not even write a function to filter out events. Also there are use cases > when you get a batched message and you need to "unpack" it into single > events. Or when you need to interpolate values from a previous event (which > is held in the state). You are able to return "null" in a function for filtering purposes > I propose that the interface should be something along the lines Function<I, > ? extends Collection<O>>. We can have such an interface > If you consider a PublishFunction then I see here the following problem. In > the very moment you also need to store a state via the Context you get a > timing issue. What if you stored the state but then for some reason you are > not able to send to the topic. Or even worse what if you could send n of m > messages and then the network fails? I would be clean and easier when pulsar > handles all these cases outside of the function implementation. To send a message to any topic from a function: ``` context.newOutputMessage(publishTopic, Schema.STRING).value(output).sendAsync(); ``` This method returns a CompletableFuture. You can always wait for the CompletableFuture to complete before updating the state. If there is a send failure, throw an exception, and in EFFECTIVELY_ONCE, the function instance will restart it self and replay the last message. Thus, your state doesn't get updated for a message that didn't get sent out. Of course updating the state could fail in theory and you would have sent a message but not updated the state. Alternatively, you can also use another Pulsar Topic as a K/V state store and publish state updates to the state to that topic. By using message sequence IDs and idempotent producing, you can achieve exactly-once state updates. This solution will take more implementation on the user's part. @sijie is adding transaction support in Pulsar, so we can also see if we can update consume, update function state, and publish message(s) all in a single transaction GitHub link: https://github.com/apache/pulsar/discussions/19657#discussioncomment-5137526 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
