GitHub user lhotari edited a comment on the discussion: Is Key_Shared suitable 
for stateful stream processing of small throughput and high cardinality

> Use case is tens of thousands of IoT devices sending a reasonably small 
> amount of telemetries (say 1 per second) that need to be statefully 
> processed. On the publishing side, we have a handful of machines receiving 
> the telemetries and dumping that onto pulsar using batching. To be able to 
> consume using Key_Shared, batches must be produced using the KEY_BASED 
> batching strategy, meaning that instead of having loads of telemetries per 
> batch I end up with basically no batching - key is device ID and each device 
> produces a mere 1 msg/sec... Since the absence of batching adds a very 
> significant load on the cluster (broker & bookies) I'm wondering if this is 
> just not the appropriate use case for the feature? Or... am I doing something 
> wrong?

You are right, that it's not optimal when batching isn't used. One possible 
solution is to calculate an intermediate sharding key to reduce the cardinality 
significantly so that batching would happen. A gut feeling is that there would 
have to be fairly low number of these sharding keys so that it would actually 
be helpful in enabling batching.

For such high volume use cases, it could be better to use partitioned topics 
with failover consumers, and then have a single consumer per partition. (For 
strict ordering, it might be necessary to use exclusive subscription to 
workaround https://github.com/apache/pulsar/issues/15189)

If you happen to be using Reactive Spring & Spring Pulsar Reactive, there's an 
alternative for Key_Shared subscriptions which works with Failover 
subscriptions, retaining key order.
Reference of [Reactive Message Consumption with Spring 
Pulsar](https://docs.spring.io/spring-pulsar/docs/current/reference/reference/reactive-pulsar/reactive-message-consumption.html#reactive-concurrency):

> However, when handling messages one-by-one, concurrency can be specified to 
> increase processing throughput. Simply set the `concurrency` property on 
> `@ReactivePulsarListener`. Additionally, when `concurrency > 1` you can 
> ensure messages are ordered by key and therefore sent to the same handler by 
> setting `useKeyOrderedProcessing = "true"` on the annotation.

In pulsar-client-reactive ReactiveMessagePipelineBuilder, it's this option:
[https://github.com/apache/pulsar-client-reactive/blob/a93b096a10b9451eaec2046b346e[…]/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java](https://github.com/apache/pulsar-client-reactive/blob/a93b096a10b9451eaec2046b346e116c040e0fcd/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java#L122-L127)

Under the covers it uses Project Reactor's groupBy operator to ensure that 
processing happens in key-order. The benefit over plain failover subscription 
is that you have tunable concurrency level, which can go to 100s with low 
resource consumption. This actually better than Key_Shared subscriptions for 
certain use cases where processing itself is not costly but most of the work is 
performed in external API backends. That is the sweet spot for Reactive Spring 
and Spring Pulsar Reactive / pulsar-client-reactive. There are examples in 
https://github.com/lhotari/reactive-iot-backend-ApacheCon2021 and 
https://github.com/lhotari/reactive-pulsar-showcase of using Pulsar Reactive 
Client.

GitHub link: 
https://github.com/apache/pulsar/discussions/24044#discussioncomment-12491790

----
This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org

Reply via email to