Hi Swapna and Geng,

Thanks for building the POC. I have some suggestions after reading the POC
and the FLIP again.

1. I think we are missing `ModelProvider.Context` when creating the Python
predict function. [1]. This context can contain runtime config passing
through ml_predict function by map such as timeout etc. Maybe you can add
it in `getPythonPredictFunction` in `PythonPredictRuntimeProvider`.

2. `GenericPythonModelProviderFactory` seems to be a default Flink provided
Python model provider factory. Do we want to add an abstraction such as
`PythonModelProviderFactory` above it so
`GenericPythonModelProviderFactory` is an implementation for it? Users can
implement other Python provider factories. It can have an abstract class
called `createPythonPredictRuntimeProvider`

3. In your flip [2], in class `PythonPredictFunction#createPythonFunction`
function, there's a `modelContext` which I don't see where it's from. Looks
like it's from the constructor in your code.

4. I see Model information such as Schema etc are only in
`GenericPythonPredictFunction` constructor, it should be provided by a
generic way to all PythonPredictFunctions.

Thanks,
Hao



[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/ml/ModelProvider.java#L47

On Tue, Dec 16, 2025 at 9:25 AM Swapna Marru <[email protected]>
wrote:

> Hi Geng,
>
> Thanks for the details.
>
> I just focussed on Thread Mode for the first POC. Idea was to extend to
> process mode later , after the overall design was approved.
> I will go through your impl, but from my previous checks, i saw there were
> some limitations around sending context specific to each
> userDefinedFunction in process mode
> as proto carries  *PythonFunctionInfo[] userDefinedFunctions *and single
> runtime context shared across all of them.
> We would need a separate model config passed for each of the functions.
> Proto could be extended , but i haven't drilled into it yet.
>
>
> >> I notice that your POC concentrates on the Thread Mode instead of the
> Process Mode.
>
> On Wed, Dec 10, 2025 at 6:38 PM Geng Biao <[email protected]> wrote:
>
> > Hi Swapna,
> > One more follow-up about the `PredictRuntimeContext`: I notice that your
> > POC concentrates on the Thread Mode instead of the Process Mode. I'm not
> > sure if the reason is that directly calling python methods like
> > `set_model_config` in Thread Mode is more convenient, but I should
> mention
> > that in Process Mode, it might not be ideal to directly call a Python
> > method from the Java side. If it's just for passing model configurations,
> > perhaps we can refer to my current implementation <
> >
> https://github.com/bgeng777/flink/blob/b029aaca9163a4691d8ce4fcfc9a1a4d6cc67527/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java#L161
> >
> > and reuse the _job_parameters in FunctionContext. This way, we might be
> > able to directly reuse the existing UDF's `open` method with
> > FunctionContext.
> >
> > Best,
> > Biao Geng
> >
> > > 2025年12月10日 01:11,Swapna Marru <[email protected]> 写道:
> > >
> > > Hi
> > >
> > > Thanks Geng Biao for the interest and the poc implementation.   Sorry I
> > was
> > > away for a while and in between some other stuff currently.
> > > I will be able to look back on this starting next week.
> > > Meanwhile, would like to share my previous POC impl,
> > >
> >
> https://github.com/apache/flink/commit/bb2d5c4aa096a4eb3a89adb6a5bfecd8e87d3262
> > >
> > >
> > > I will take a look at your implementation.
> > > In previous discussions with ShengKai , one more thing we wanted to
> > explore
> > > is the flexibility of having the model provider factory also in Python
> ,
> > > instead of having it in Java as proposed in FLIP.
> > > This is something, I haven't yet gotten time to dig deeper.
> > >
> > > Yes i agree on this.
> > >>> I notice that the parameter `model-directory-path` is introduced. It
> > > should work but I see that some popular python model libraries like
> > > `transformer` and `vllm`, tend to directly use the parameter name
> `model`
> > > which could be a local path or a simple name like `Qwen/Qwen3-0.6B`.
> Then
> > > they check if `model` is a path or a simple name and automatically
> > download
> > > it to local cache directory if necessary. Considering use cases  based
> > > these libraries, maybe  `model-directory-path` can be renamed to
> `model`
> > > which can be either a real path or a simple model name.
> > >
> > > Yes , PredictRuntimeContext is more generic and is extendible. I will
> > > update the FLIP soon.
> > >>> There is a `set_model_config`  in the python PredictFunction
> interface.
> > > I find that in the previous discussion, you are considering passing
> > > PredictRuntimeContext in `open`. I do agree with this choice as well,
> > which
> > > makes the logic more clear and uses can just use the configs in the
> > `open`
> > > method to init their model. I just find that the FLIP still has the
> > > `set_model_config` so just want to double check it here.
> > >
> > > -Thanks,
> > > M.Swapna
> > >
> > >
> > > On Tue, Dec 2, 2025 at 4:48 AM Geng Biao <[email protected] <mailto:
> > [email protected]>> wrote:
> > >
> > >> Hi folks,
> > >>
> > >> I am writing to follow up with this FLIP. I recently implemented a PoC
> > for
> > >> the ML_PREDICT using local python model based on current PyFlink’s
> udtf
> > >> framework.
> > >> Codes can be found here:
> > >>
> >
> https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777
> > >> I believe the proposal can work and want to share some findings:
> > >> My implementation follows most SQL section design of the FLIP (only
> > >> difference is that I use ‘model’ option instead of
> > >> 'model-directory-path’(see my previous reply of the thread for
> details),
> > >> which I think is expressive enough.
> > >>
> > >> In the python part, as the ML_PREDICT is a specialized UDTF and
> PyFlink
> > >> also supports UDTF, it is natural to directly reuse the udtf <
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692
> > >
> > >> decorator to define a python predict class. One point is that as the
> SQL
> > >> primitive has defined OUTPUT schema, we may need to enhance the
> > PyFlink’s
> > >> udtf interface to allow users to use it without specifying
> > `result_types`
> > >> and directly use the definition from the DDL. Another point is that we
> > need
> > >> to pass some model parameters(e.g. temperature, GPU/CPU settings) to
> > Python
> > >> process so we may need to enhance PyFlink to support such usage so
> that
> > in
> > >> the UDTF, users can directly get the model parameters. After
> introducing
> > >> such improvements, we can define the python logic like this:
> > >>
> > >> ```
> > >> class HuggingFaceFunc(TableFunction):
> > >>    def __init__(self):
> > >>        self.model = None
> > >>        self.tokenizer = None
> > >>        self.pipeline = None
> > >>
> > >>    def open(self, runtime_context):
> > >>        model_dir = runtime_context.get_job_parameter("model", None)
> > >>        ...
> > >>        self.tokenizer = AutoTokenizer.from_pretrained(
> > >>            model_dir, trust_remote_code=True
> > >>        )
> > >>
> > >>        self.model = AutoModelForCausalLM.from_pretrained(model_dir,
> > >> device_map=device)
> > >>        self.pipeline = pipeline(
> > >>            "text-generation", model=self.model,
> tokenizer=self.tokenizer
> > >>        )
> > >>        assert self.model is not None
> > >>
> > >>    def eval(self, content: str, comment: str):
> > >>        output = self.pipeline(content)[0]["generated_text"]
> > >>        yield output, len(output)
> > >>
> > >>
> > >> HuggingFaceModelUDTF = udtf(HuggingFaceFunc())
> > >> ```
> > >>
> > >> I have implemented 2 python predict class, one uses HuggingFace and
> > >> another one is based on vLLM to verify this design(codes can be found
> > here <
> > >>
> >
> https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py
> > >).
> > >> So to conclude, I am wondering if we really need a new Python `class
> > >> PredictFunction(TableFunction)`.
> > >>
> > >> Thanks for reading and looking forward to your thoughts.
> > >>
> > >> Best,
> > >> Biao Geng
> > >>
> > >>
> > >>> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道:
> > >>>
> > >>> Hi Swapna,
> > >>>
> > >>> Thanks for the FLIP and sorry for the late reply. After reading it, I
> > >> have some comments about the proposal.
> > >>>
> > >>> 1. I notice that the parameter `model-directory-path` is introduced.
> It
> > >> should work but I see that some popular python model libraries like
> > >> `transformer` and `vllm`, tend to directly use the parameter name
> > `model`
> > >> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`.
> > Then
> > >> they check if `model` is a path or a simple name and automatically
> > download
> > >> it to local cache directory if necessary. Considering use cases  based
> > >> these libraries, maybe  `model-directory-path` can be renamed to
> `model`
> > >> which can be either a real path or a simple model name.
> > >>>
> > >>> 2. There is a `set_model_config`  in the python PredictFunction
> > >> interface. I find that in the previous discussion, you are considering
> > >> passing PredictRuntimeContext in `open`. I do agree with this choice
> as
> > >> well, which makes the logic more clear and uses can just use the
> > configs in
> > >> the `open` method to init their model. I just find that the FLIP still
> > has
> > >> the `set_model_config` so just want to double check it here.
> > >>>
> > >>>
> > >>>
> > >>> Thanks for your time!
> > >>>
> > >>> Best,
> > >>> Biao Geng
> > >>>
> > >>>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道:
> > >>>>
> > >>>> Hi Devs,
> > >>>>
> > >>>>
> > >>>> I am interested in learning more about MODEL, ML_PREDICT, and
> > >> ML_EVALUATE
> > >>>> functionalities added in the following FLIP.
> > >>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> > >>>>
> > >>>>
> > >>>> I see the original FLIP has extensibility to local model providers
> in
> > >> Flink.
> > >>>>
> > >>>>
> > >>>> Is there a way to do pluggable local model providers in Python?
> Like,
> > >> say,
> > >>>> generate embeddings using Sentence transformer models running
> locally
> > in
> > >>>> Flink.
> > >>>>
> > >>>>
> > >>>> An option could be to introduce a Model Provider factory
> > implementation
> > >> in
> > >>>> Java that internally uses a predict function in Python . But I see
> > this
> > >>>> puts in a lot of work related to Java to Python
> > >> communication/translation
> > >>>> inside the provider.
> > >>>>
> > >>>>
> > >>>> Something like PythonRuntimeProvider along with
> > PredictRuntimeProvider /
> > >>>> AsyncRuntimeProvider which can handle Java -> Python translations
> out
> > of
> > >>>> the box would be helpful to de-duplicate that effort.
> > >>>>
> > >>>>
> > >>>> Can you please point to, if there are any discussions related to
> this
> > >>>> already ? Or any other ways to achieve the same? Please share your
> > >> thoughts.
> > >>>>
> > >>>>
> > >>>> -Thanks,
> > >>>>
> > >>>> Swapna Marru
> >
> >
>

Reply via email to