Hi Swapna and Geng, Thanks for building the POC. I have some suggestions after reading the POC and the FLIP again.
1. I think we are missing `ModelProvider.Context` when creating the Python predict function. [1]. This context can contain runtime config passing through ml_predict function by map such as timeout etc. Maybe you can add it in `getPythonPredictFunction` in `PythonPredictRuntimeProvider`. 2. `GenericPythonModelProviderFactory` seems to be a default Flink provided Python model provider factory. Do we want to add an abstraction such as `PythonModelProviderFactory` above it so `GenericPythonModelProviderFactory` is an implementation for it? Users can implement other Python provider factories. It can have an abstract class called `createPythonPredictRuntimeProvider` 3. In your flip [2], in class `PythonPredictFunction#createPythonFunction` function, there's a `modelContext` which I don't see where it's from. Looks like it's from the constructor in your code. 4. I see Model information such as Schema etc are only in `GenericPythonPredictFunction` constructor, it should be provided by a generic way to all PythonPredictFunctions. Thanks, Hao [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/ml/ModelProvider.java#L47 On Tue, Dec 16, 2025 at 9:25 AM Swapna Marru <[email protected]> wrote: > Hi Geng, > > Thanks for the details. > > I just focussed on Thread Mode for the first POC. Idea was to extend to > process mode later , after the overall design was approved. > I will go through your impl, but from my previous checks, i saw there were > some limitations around sending context specific to each > userDefinedFunction in process mode > as proto carries *PythonFunctionInfo[] userDefinedFunctions *and single > runtime context shared across all of them. > We would need a separate model config passed for each of the functions. > Proto could be extended , but i haven't drilled into it yet. > > > >> I notice that your POC concentrates on the Thread Mode instead of the > Process Mode. > > On Wed, Dec 10, 2025 at 6:38 PM Geng Biao <[email protected]> wrote: > > > Hi Swapna, > > One more follow-up about the `PredictRuntimeContext`: I notice that your > > POC concentrates on the Thread Mode instead of the Process Mode. I'm not > > sure if the reason is that directly calling python methods like > > `set_model_config` in Thread Mode is more convenient, but I should > mention > > that in Process Mode, it might not be ideal to directly call a Python > > method from the Java side. If it's just for passing model configurations, > > perhaps we can refer to my current implementation < > > > https://github.com/bgeng777/flink/blob/b029aaca9163a4691d8ce4fcfc9a1a4d6cc67527/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java#L161 > > > > and reuse the _job_parameters in FunctionContext. This way, we might be > > able to directly reuse the existing UDF's `open` method with > > FunctionContext. > > > > Best, > > Biao Geng > > > > > 2025年12月10日 01:11,Swapna Marru <[email protected]> 写道: > > > > > > Hi > > > > > > Thanks Geng Biao for the interest and the poc implementation. Sorry I > > was > > > away for a while and in between some other stuff currently. > > > I will be able to look back on this starting next week. > > > Meanwhile, would like to share my previous POC impl, > > > > > > https://github.com/apache/flink/commit/bb2d5c4aa096a4eb3a89adb6a5bfecd8e87d3262 > > > > > > > > > I will take a look at your implementation. > > > In previous discussions with ShengKai , one more thing we wanted to > > explore > > > is the flexibility of having the model provider factory also in Python > , > > > instead of having it in Java as proposed in FLIP. > > > This is something, I haven't yet gotten time to dig deeper. > > > > > > Yes i agree on this. > > >>> I notice that the parameter `model-directory-path` is introduced. It > > > should work but I see that some popular python model libraries like > > > `transformer` and `vllm`, tend to directly use the parameter name > `model` > > > which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. > Then > > > they check if `model` is a path or a simple name and automatically > > download > > > it to local cache directory if necessary. Considering use cases based > > > these libraries, maybe `model-directory-path` can be renamed to > `model` > > > which can be either a real path or a simple model name. > > > > > > Yes , PredictRuntimeContext is more generic and is extendible. I will > > > update the FLIP soon. > > >>> There is a `set_model_config` in the python PredictFunction > interface. > > > I find that in the previous discussion, you are considering passing > > > PredictRuntimeContext in `open`. I do agree with this choice as well, > > which > > > makes the logic more clear and uses can just use the configs in the > > `open` > > > method to init their model. I just find that the FLIP still has the > > > `set_model_config` so just want to double check it here. > > > > > > -Thanks, > > > M.Swapna > > > > > > > > > On Tue, Dec 2, 2025 at 4:48 AM Geng Biao <[email protected] <mailto: > > [email protected]>> wrote: > > > > > >> Hi folks, > > >> > > >> I am writing to follow up with this FLIP. I recently implemented a PoC > > for > > >> the ML_PREDICT using local python model based on current PyFlink’s > udtf > > >> framework. > > >> Codes can be found here: > > >> > > > https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777 > > >> I believe the proposal can work and want to share some findings: > > >> My implementation follows most SQL section design of the FLIP (only > > >> difference is that I use ‘model’ option instead of > > >> 'model-directory-path’(see my previous reply of the thread for > details), > > >> which I think is expressive enough. > > >> > > >> In the python part, as the ML_PREDICT is a specialized UDTF and > PyFlink > > >> also supports UDTF, it is natural to directly reuse the udtf < > > >> > > > https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692 > > > > > >> decorator to define a python predict class. One point is that as the > SQL > > >> primitive has defined OUTPUT schema, we may need to enhance the > > PyFlink’s > > >> udtf interface to allow users to use it without specifying > > `result_types` > > >> and directly use the definition from the DDL. Another point is that we > > need > > >> to pass some model parameters(e.g. temperature, GPU/CPU settings) to > > Python > > >> process so we may need to enhance PyFlink to support such usage so > that > > in > > >> the UDTF, users can directly get the model parameters. After > introducing > > >> such improvements, we can define the python logic like this: > > >> > > >> ``` > > >> class HuggingFaceFunc(TableFunction): > > >> def __init__(self): > > >> self.model = None > > >> self.tokenizer = None > > >> self.pipeline = None > > >> > > >> def open(self, runtime_context): > > >> model_dir = runtime_context.get_job_parameter("model", None) > > >> ... > > >> self.tokenizer = AutoTokenizer.from_pretrained( > > >> model_dir, trust_remote_code=True > > >> ) > > >> > > >> self.model = AutoModelForCausalLM.from_pretrained(model_dir, > > >> device_map=device) > > >> self.pipeline = pipeline( > > >> "text-generation", model=self.model, > tokenizer=self.tokenizer > > >> ) > > >> assert self.model is not None > > >> > > >> def eval(self, content: str, comment: str): > > >> output = self.pipeline(content)[0]["generated_text"] > > >> yield output, len(output) > > >> > > >> > > >> HuggingFaceModelUDTF = udtf(HuggingFaceFunc()) > > >> ``` > > >> > > >> I have implemented 2 python predict class, one uses HuggingFace and > > >> another one is based on vLLM to verify this design(codes can be found > > here < > > >> > > > https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py > > >). > > >> So to conclude, I am wondering if we really need a new Python `class > > >> PredictFunction(TableFunction)`. > > >> > > >> Thanks for reading and looking forward to your thoughts. > > >> > > >> Best, > > >> Biao Geng > > >> > > >> > > >>> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道: > > >>> > > >>> Hi Swapna, > > >>> > > >>> Thanks for the FLIP and sorry for the late reply. After reading it, I > > >> have some comments about the proposal. > > >>> > > >>> 1. I notice that the parameter `model-directory-path` is introduced. > It > > >> should work but I see that some popular python model libraries like > > >> `transformer` and `vllm`, tend to directly use the parameter name > > `model` > > >> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. > > Then > > >> they check if `model` is a path or a simple name and automatically > > download > > >> it to local cache directory if necessary. Considering use cases based > > >> these libraries, maybe `model-directory-path` can be renamed to > `model` > > >> which can be either a real path or a simple model name. > > >>> > > >>> 2. There is a `set_model_config` in the python PredictFunction > > >> interface. I find that in the previous discussion, you are considering > > >> passing PredictRuntimeContext in `open`. I do agree with this choice > as > > >> well, which makes the logic more clear and uses can just use the > > configs in > > >> the `open` method to init their model. I just find that the FLIP still > > has > > >> the `set_model_config` so just want to double check it here. > > >>> > > >>> > > >>> > > >>> Thanks for your time! > > >>> > > >>> Best, > > >>> Biao Geng > > >>> > > >>>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道: > > >>>> > > >>>> Hi Devs, > > >>>> > > >>>> > > >>>> I am interested in learning more about MODEL, ML_PREDICT, and > > >> ML_EVALUATE > > >>>> functionalities added in the following FLIP. > > >>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL > > >>>> > > >>>> > > >>>> I see the original FLIP has extensibility to local model providers > in > > >> Flink. > > >>>> > > >>>> > > >>>> Is there a way to do pluggable local model providers in Python? > Like, > > >> say, > > >>>> generate embeddings using Sentence transformer models running > locally > > in > > >>>> Flink. > > >>>> > > >>>> > > >>>> An option could be to introduce a Model Provider factory > > implementation > > >> in > > >>>> Java that internally uses a predict function in Python . But I see > > this > > >>>> puts in a lot of work related to Java to Python > > >> communication/translation > > >>>> inside the provider. > > >>>> > > >>>> > > >>>> Something like PythonRuntimeProvider along with > > PredictRuntimeProvider / > > >>>> AsyncRuntimeProvider which can handle Java -> Python translations > out > > of > > >>>> the box would be helpful to de-duplicate that effort. > > >>>> > > >>>> > > >>>> Can you please point to, if there are any discussions related to > this > > >>>> already ? Or any other ways to achieve the same? Please share your > > >> thoughts. > > >>>> > > >>>> > > >>>> -Thanks, > > >>>> > > >>>> Swapna Marru > > > > >
