Hi Gabor!

i appreciate your answers, it clarified a lot of details for me that I
misunderstood from your first email.

now i understand that the State Processor API is only the service loader
entry point, and the actual TableFunction implementations will stay in the
connector jars, which indeed avoids the need for extra modules. ty for the
clarification also on TF vs TVF, and thanks for pointing me to the
savepoint_metadata example

overall, i think your approach is pragmatic and keeps things simple.
cheers,
Zsombor

Gabor Somogyi <gabor.g.somo...@gmail.com> ezt írta (időpont: 2025. aug.
18., H, 22:27):

> Hi Zsombor,
>
> Thanks for your contribution, please see my answers inline.
>
> G
>
>
> On Mon, Aug 18, 2025 at 7:24 PM Zsombor Chikán <zsomborchi...@gmail.com>
> wrote:
>
> > Hi Gabor,
> >
> > Thanks for sharing this idea, I think it makes a lot of sense, and the
> > Kafka offsets example is a very practical starting point.
> >
> > a few thoughts/questions from my side:
> >
> >    - is the state processor API the proper place to put these new
> >    functions? or do you think it worth to have separate helper modules
> like
> >    kafka tools module, or iceberg tool module? maybe with this we can
> have
> >    connector-specific utility modules, and we can avoid coupling with
> state
> >    processing
> >
> I thinks some things are mixed up here. The state processor API just calls
> something like this:
> *ServiceLoader.load(DynamicBuiltInFunctionDefinitionFactory.class)*.
> Practically it loads function
> definitions and the module just gives back it in *listFunctions*. So the
> state processor API has
> no helper functions inside. *TableFunction* instances will live in the
> Kafka, Iceberg connectors.
> Creating additional modules would be an overkill I'm pretty sure.
>
>    - i like the function way you described, and i think since a "normal"
> >    (scalar) sql function usually returns a single value (like SUM(10,20)
> ->
> >    30, or CONCAT('first', 'name') -> 'firstname'), it would be also a
> good
> >    idea to make it as a TVF which can return multiple rows (because
> offsets
> >    are usually a set of rows -> topic, partition, committed, end, lag,
> > etc).
> >    wdyt?
> >       - example:
> >       - SELECT * FROM kafka_offsets(
> >          - topic => 'orders',
> >          - group_id => 'orders-app'
> >       - );
> >
> *TableFunction *instances can give back multiple rows and they work well
> already like *savepoint_metadata* [1].
> When we've chosen TF instead of TVF there is a huge discussion where we've
> considered many factors.
>
>
> >    - do you think these helper functions need credentials via WITH
> options?
> >
>
> Let's say somebody implements a *TableFunction *in Kafka. This can be
> answered what the function does.
> In case of Kafka offsets I'm not intended to add any such options because
> the code will use Flink FS in the background.
>
>
> >    - and i think we will need some guidance to avoid jar conflicts (kafka
> >    client versions, shaded artifact), wdyt?
> >
>
> Not sure what kind of jar conflict we could have here. The complete Flink
> side implementation contains the following:
> - An interface which can be loaded by the service loader
> - The actual service loader call in the module
> - *TableFunction *implementations with a tiny service loader registration
> class embedded inside existing connector jars.
>
> Hope this helps. If there are further fundamental questions I can open the
> Flink side PR because it's not so complex as it sounds.
>
> [1]
>
> https://github.com/apache/flink/blob/4aac6902dd012fa70fbc50dc5ae79921b991c21a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointMetadataTableFunction.java#L31
>
>
> > cheers,
> > Zsombor
> >
> > Gabor Somogyi <gabor.g.somo...@gmail.com> ezt írta (időpont: 2025. aug.
> > 18., H, 13:23):
> >
> > > Hi all,
> > >
> > > SQL modules like the state processor API could potentially add SQL
> > > functions which are
> > > coming from external connectors like Kafka, Iceberg, etc...
> > > The dynamic nature, instead of hardcoding it into the module comes from
> > the
> > > fact that
> > > we're not intended to add direct external connector dependencies to
> Flink
> > > itself (state processor API here).
> > >
> > > One of the most obvious example of such functionality is getting the
> > Kafka
> > > offsets.
> > > The intended end-user interaction would look like the following:
> > > - Add state processor API jar to the classpath
> > > - Add Kafka connector jar to the classpath
> > > - LOAD MODULE state
> > > - SELECT * FROM get_kafka_offsets(...)
> > >
> > > In the background nothing super complex thing would happen, just a
> > service
> > > loader
> > > can load function definitions dynamically in the state module. It worth
> > to
> > > highlight that
> > > no intention to change any actual APIs just adding this as new feature.
> > >
> > > Please share your thought on this.
> > >
> > > BR,
> > > G
> > >
> >
>

Reply via email to