Thanks Hao and Geng. Was waiting to finish up on some high level refactored POC to get the feedback on redesign, before we discuss more finer details.
Here is new POC code ,https://github.com/swapna267/flink/pull/1/files This relies on ModelHandlerFactory and ModelHandler completely written in Python and Python Entrypoints <https://github.com/swapna267/flink/blob/8c002c964378415912a04685bff263f62bbd1c37/flink-python/pyflink/table/ml/__init__.py#L53-L56> to register the factories dynamically. SQL would now look like, CREATE MODEL my_pytorch_model INPUT () OUTPUT () LANGUAGE PYTHON WITH ('model.type'='pytorch' ...); When it's LANGUAGE PYTHON, it just invokes an internal UDTF <https://github.com/swapna267/flink/blob/8c002c964378415912a04685bff263f62bbd1c37/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java#L221> , which handles ModelHandler initialization and inference. I will add the class diagram and more details to the design document too. In POC, i am using a python UDTF and batch the data in UDTF. This may not be efficient. We need to explore on more efficient ways to do this, may be something like Vectorized UDFs <https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/python/table/udfs/vectorized_python_udfs/> . Please share if someone have insights on this. -Thanks, M.Swapna On Wed, Jan 7, 2026 at 4:12 AM Geng Biao <[email protected]> wrote: > HI folks, > > Hope this email finds you well in 2026. +1 for Hao’s 4 points, especially > for introducing an abstract PythonModelProvider to encapsulate some common > logic, making it easier for developers and users to implement custom and > user-friendly python model provider easier. > > Looking forward to making the FLIP more concrete together. > Best, > Biao Geng > > > > 2025年12月17日 03:01,Hao Li via dev <[email protected]> 写道: > > > > Hi Swapna and Geng, > > > > Thanks for building the POC. I have some suggestions after reading the > POC > > and the FLIP again. > > > > 1. I think we are missing `ModelProvider.Context` when creating the > Python > > predict function. [1]. This context can contain runtime config passing > > through ml_predict function by map such as timeout etc. Maybe you can add > > it in `getPythonPredictFunction` in `PythonPredictRuntimeProvider`. > > > > 2. `GenericPythonModelProviderFactory` seems to be a default Flink > provided > > Python model provider factory. Do we want to add an abstraction such as > > `PythonModelProviderFactory` above it so > > `GenericPythonModelProviderFactory` is an implementation for it? Users > can > > implement other Python provider factories. It can have an abstract class > > called `createPythonPredictRuntimeProvider` > > > > 3. In your flip [2], in class > `PythonPredictFunction#createPythonFunction` > > function, there's a `modelContext` which I don't see where it's from. > Looks > > like it's from the constructor in your code. > > > > 4. I see Model information such as Schema etc are only in > > `GenericPythonPredictFunction` constructor, it should be provided by a > > generic way to all PythonPredictFunctions. > > > > Thanks, > > Hao > > > > > > > > [1] > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/ml/ModelProvider.java#L47 > > > > On Tue, Dec 16, 2025 at 9:25 AM Swapna Marru <[email protected]> > > wrote: > > > >> Hi Geng, > >> > >> Thanks for the details. > >> > >> I just focussed on Thread Mode for the first POC. Idea was to extend to > >> process mode later , after the overall design was approved. > >> I will go through your impl, but from my previous checks, i saw there > were > >> some limitations around sending context specific to each > >> userDefinedFunction in process mode > >> as proto carries *PythonFunctionInfo[] userDefinedFunctions *and single > >> runtime context shared across all of them. > >> We would need a separate model config passed for each of the functions. > >> Proto could be extended , but i haven't drilled into it yet. > >> > >> > >>>> I notice that your POC concentrates on the Thread Mode instead of the > >> Process Mode. > >> > >> On Wed, Dec 10, 2025 at 6:38 PM Geng Biao <[email protected]> wrote: > >> > >>> Hi Swapna, > >>> One more follow-up about the `PredictRuntimeContext`: I notice that > your > >>> POC concentrates on the Thread Mode instead of the Process Mode. I'm > not > >>> sure if the reason is that directly calling python methods like > >>> `set_model_config` in Thread Mode is more convenient, but I should > >> mention > >>> that in Process Mode, it might not be ideal to directly call a Python > >>> method from the Java side. If it's just for passing model > configurations, > >>> perhaps we can refer to my current implementation < > >>> > >> > https://github.com/bgeng777/flink/blob/b029aaca9163a4691d8ce4fcfc9a1a4d6cc67527/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java#L161 > >>> > >>> and reuse the _job_parameters in FunctionContext. This way, we might be > >>> able to directly reuse the existing UDF's `open` method with > >>> FunctionContext. > >>> > >>> Best, > >>> Biao Geng > >>> > >>>> 2025年12月10日 01:11,Swapna Marru <[email protected]> 写道: > >>>> > >>>> Hi > >>>> > >>>> Thanks Geng Biao for the interest and the poc implementation. Sorry > I > >>> was > >>>> away for a while and in between some other stuff currently. > >>>> I will be able to look back on this starting next week. > >>>> Meanwhile, would like to share my previous POC impl, > >>>> > >>> > >> > https://github.com/apache/flink/commit/bb2d5c4aa096a4eb3a89adb6a5bfecd8e87d3262 > >>>> > >>>> > >>>> I will take a look at your implementation. > >>>> In previous discussions with ShengKai , one more thing we wanted to > >>> explore > >>>> is the flexibility of having the model provider factory also in Python > >> , > >>>> instead of having it in Java as proposed in FLIP. > >>>> This is something, I haven't yet gotten time to dig deeper. > >>>> > >>>> Yes i agree on this. > >>>>>> I notice that the parameter `model-directory-path` is introduced. It > >>>> should work but I see that some popular python model libraries like > >>>> `transformer` and `vllm`, tend to directly use the parameter name > >> `model` > >>>> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. > >> Then > >>>> they check if `model` is a path or a simple name and automatically > >>> download > >>>> it to local cache directory if necessary. Considering use cases based > >>>> these libraries, maybe `model-directory-path` can be renamed to > >> `model` > >>>> which can be either a real path or a simple model name. > >>>> > >>>> Yes , PredictRuntimeContext is more generic and is extendible. I will > >>>> update the FLIP soon. > >>>>>> There is a `set_model_config` in the python PredictFunction > >> interface. > >>>> I find that in the previous discussion, you are considering passing > >>>> PredictRuntimeContext in `open`. I do agree with this choice as well, > >>> which > >>>> makes the logic more clear and uses can just use the configs in the > >>> `open` > >>>> method to init their model. I just find that the FLIP still has the > >>>> `set_model_config` so just want to double check it here. > >>>> > >>>> -Thanks, > >>>> M.Swapna > >>>> > >>>> > >>>> On Tue, Dec 2, 2025 at 4:48 AM Geng Biao <[email protected] > <mailto: > >>> [email protected]>> wrote: > >>>> > >>>>> Hi folks, > >>>>> > >>>>> I am writing to follow up with this FLIP. I recently implemented a > PoC > >>> for > >>>>> the ML_PREDICT using local python model based on current PyFlink’s > >> udtf > >>>>> framework. > >>>>> Codes can be found here: > >>>>> > >>> > >> > https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777 > >>>>> I believe the proposal can work and want to share some findings: > >>>>> My implementation follows most SQL section design of the FLIP (only > >>>>> difference is that I use ‘model’ option instead of > >>>>> 'model-directory-path’(see my previous reply of the thread for > >> details), > >>>>> which I think is expressive enough. > >>>>> > >>>>> In the python part, as the ML_PREDICT is a specialized UDTF and > >> PyFlink > >>>>> also supports UDTF, it is natural to directly reuse the udtf < > >>>>> > >>> > >> > https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692 > >>>> > >>>>> decorator to define a python predict class. One point is that as the > >> SQL > >>>>> primitive has defined OUTPUT schema, we may need to enhance the > >>> PyFlink’s > >>>>> udtf interface to allow users to use it without specifying > >>> `result_types` > >>>>> and directly use the definition from the DDL. Another point is that > we > >>> need > >>>>> to pass some model parameters(e.g. temperature, GPU/CPU settings) to > >>> Python > >>>>> process so we may need to enhance PyFlink to support such usage so > >> that > >>> in > >>>>> the UDTF, users can directly get the model parameters. After > >> introducing > >>>>> such improvements, we can define the python logic like this: > >>>>> > >>>>> ``` > >>>>> class HuggingFaceFunc(TableFunction): > >>>>> def __init__(self): > >>>>> self.model = None > >>>>> self.tokenizer = None > >>>>> self.pipeline = None > >>>>> > >>>>> def open(self, runtime_context): > >>>>> model_dir = runtime_context.get_job_parameter("model", None) > >>>>> ... > >>>>> self.tokenizer = AutoTokenizer.from_pretrained( > >>>>> model_dir, trust_remote_code=True > >>>>> ) > >>>>> > >>>>> self.model = AutoModelForCausalLM.from_pretrained(model_dir, > >>>>> device_map=device) > >>>>> self.pipeline = pipeline( > >>>>> "text-generation", model=self.model, > >> tokenizer=self.tokenizer > >>>>> ) > >>>>> assert self.model is not None > >>>>> > >>>>> def eval(self, content: str, comment: str): > >>>>> output = self.pipeline(content)[0]["generated_text"] > >>>>> yield output, len(output) > >>>>> > >>>>> > >>>>> HuggingFaceModelUDTF = udtf(HuggingFaceFunc()) > >>>>> ``` > >>>>> > >>>>> I have implemented 2 python predict class, one uses HuggingFace and > >>>>> another one is based on vLLM to verify this design(codes can be found > >>> here < > >>>>> > >>> > >> > https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py > >>>> ). > >>>>> So to conclude, I am wondering if we really need a new Python `class > >>>>> PredictFunction(TableFunction)`. > >>>>> > >>>>> Thanks for reading and looking forward to your thoughts. > >>>>> > >>>>> Best, > >>>>> Biao Geng > >>>>> > >>>>> > >>>>>> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道: > >>>>>> > >>>>>> Hi Swapna, > >>>>>> > >>>>>> Thanks for the FLIP and sorry for the late reply. After reading it, > I > >>>>> have some comments about the proposal. > >>>>>> > >>>>>> 1. I notice that the parameter `model-directory-path` is introduced. > >> It > >>>>> should work but I see that some popular python model libraries like > >>>>> `transformer` and `vllm`, tend to directly use the parameter name > >>> `model` > >>>>> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. > >>> Then > >>>>> they check if `model` is a path or a simple name and automatically > >>> download > >>>>> it to local cache directory if necessary. Considering use cases > based > >>>>> these libraries, maybe `model-directory-path` can be renamed to > >> `model` > >>>>> which can be either a real path or a simple model name. > >>>>>> > >>>>>> 2. There is a `set_model_config` in the python PredictFunction > >>>>> interface. I find that in the previous discussion, you are > considering > >>>>> passing PredictRuntimeContext in `open`. I do agree with this choice > >> as > >>>>> well, which makes the logic more clear and uses can just use the > >>> configs in > >>>>> the `open` method to init their model. I just find that the FLIP > still > >>> has > >>>>> the `set_model_config` so just want to double check it here. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Thanks for your time! > >>>>>> > >>>>>> Best, > >>>>>> Biao Geng > >>>>>> > >>>>>>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道: > >>>>>>> > >>>>>>> Hi Devs, > >>>>>>> > >>>>>>> > >>>>>>> I am interested in learning more about MODEL, ML_PREDICT, and > >>>>> ML_EVALUATE > >>>>>>> functionalities added in the following FLIP. > >>>>>>> > >>>>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL > >>>>>>> > >>>>>>> > >>>>>>> I see the original FLIP has extensibility to local model providers > >> in > >>>>> Flink. > >>>>>>> > >>>>>>> > >>>>>>> Is there a way to do pluggable local model providers in Python? > >> Like, > >>>>> say, > >>>>>>> generate embeddings using Sentence transformer models running > >> locally > >>> in > >>>>>>> Flink. > >>>>>>> > >>>>>>> > >>>>>>> An option could be to introduce a Model Provider factory > >>> implementation > >>>>> in > >>>>>>> Java that internally uses a predict function in Python . But I see > >>> this > >>>>>>> puts in a lot of work related to Java to Python > >>>>> communication/translation > >>>>>>> inside the provider. > >>>>>>> > >>>>>>> > >>>>>>> Something like PythonRuntimeProvider along with > >>> PredictRuntimeProvider / > >>>>>>> AsyncRuntimeProvider which can handle Java -> Python translations > >> out > >>> of > >>>>>>> the box would be helpful to de-duplicate that effort. > >>>>>>> > >>>>>>> > >>>>>>> Can you please point to, if there are any discussions related to > >> this > >>>>>>> already ? Or any other ways to achieve the same? Please share your > >>>>> thoughts. > >>>>>>> > >>>>>>> > >>>>>>> -Thanks, > >>>>>>> > >>>>>>> Swapna Marru > >>> > >>> > >> > >
