Hi Gabor,

Thanks for sharing this idea, I think it makes a lot of sense, and the
Kafka offsets example is a very practical starting point.

a few thoughts/questions from my side:

   - is the state processor API the proper place to put these new
   functions? or do you think it worth to have separate helper modules like
   kafka tools module, or iceberg tool module? maybe with this we can have
   connector-specific utility modules, and we can avoid coupling with state
   processing
   - i like the function way you described, and i think since a "normal"
   (scalar) sql function usually returns a single value (like SUM(10,20) ->
   30, or CONCAT('first', 'name') -> 'firstname'), it would be also a good
   idea to make it as a TVF which can return multiple rows (because offsets
   are usually a set of rows -> topic, partition, committed, end, lag, etc).
   wdyt?
      - example:
      - SELECT * FROM kafka_offsets(
         - topic => 'orders',
         - group_id => 'orders-app'
      - );
   - do you think these helper functions need credentials via WITH options?
   - and i think we will need some guidance to avoid jar conflicts (kafka
   client versions, shaded artifact), wdyt?

cheers,
Zsombor

Gabor Somogyi <gabor.g.somo...@gmail.com> ezt írta (időpont: 2025. aug.
18., H, 13:23):

> Hi all,
>
> SQL modules like the state processor API could potentially add SQL
> functions which are
> coming from external connectors like Kafka, Iceberg, etc...
> The dynamic nature, instead of hardcoding it into the module comes from the
> fact that
> we're not intended to add direct external connector dependencies to Flink
> itself (state processor API here).
>
> One of the most obvious example of such functionality is getting the Kafka
> offsets.
> The intended end-user interaction would look like the following:
> - Add state processor API jar to the classpath
> - Add Kafka connector jar to the classpath
> - LOAD MODULE state
> - SELECT * FROM get_kafka_offsets(...)
>
> In the background nothing super complex thing would happen, just a service
> loader
> can load function definitions dynamically in the state module. It worth to
> highlight that
> no intention to change any actual APIs just adding this as new feature.
>
> Please share your thought on this.
>
> BR,
> G
>

Reply via email to