Hi Geng, Thanks for the details.
I just focussed on Thread Mode for the first POC. Idea was to extend to process mode later , after the overall design was approved. I will go through your impl, but from my previous checks, i saw there were some limitations around sending context specific to each userDefinedFunction in process mode as proto carries *PythonFunctionInfo[] userDefinedFunctions *and single runtime context shared across all of them. We would need a separate model config passed for each of the functions. Proto could be extended , but i haven't drilled into it yet. >> I notice that your POC concentrates on the Thread Mode instead of the Process Mode. On Wed, Dec 10, 2025 at 6:38 PM Geng Biao <[email protected]> wrote: > Hi Swapna, > One more follow-up about the `PredictRuntimeContext`: I notice that your > POC concentrates on the Thread Mode instead of the Process Mode. I'm not > sure if the reason is that directly calling python methods like > `set_model_config` in Thread Mode is more convenient, but I should mention > that in Process Mode, it might not be ideal to directly call a Python > method from the Java side. If it's just for passing model configurations, > perhaps we can refer to my current implementation < > https://github.com/bgeng777/flink/blob/b029aaca9163a4691d8ce4fcfc9a1a4d6cc67527/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java#L161> > and reuse the _job_parameters in FunctionContext. This way, we might be > able to directly reuse the existing UDF's `open` method with > FunctionContext. > > Best, > Biao Geng > > > 2025年12月10日 01:11,Swapna Marru <[email protected]> 写道: > > > > Hi > > > > Thanks Geng Biao for the interest and the poc implementation. Sorry I > was > > away for a while and in between some other stuff currently. > > I will be able to look back on this starting next week. > > Meanwhile, would like to share my previous POC impl, > > > https://github.com/apache/flink/commit/bb2d5c4aa096a4eb3a89adb6a5bfecd8e87d3262 > > > > > > I will take a look at your implementation. > > In previous discussions with ShengKai , one more thing we wanted to > explore > > is the flexibility of having the model provider factory also in Python , > > instead of having it in Java as proposed in FLIP. > > This is something, I haven't yet gotten time to dig deeper. > > > > Yes i agree on this. > >>> I notice that the parameter `model-directory-path` is introduced. It > > should work but I see that some popular python model libraries like > > `transformer` and `vllm`, tend to directly use the parameter name `model` > > which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. Then > > they check if `model` is a path or a simple name and automatically > download > > it to local cache directory if necessary. Considering use cases based > > these libraries, maybe `model-directory-path` can be renamed to `model` > > which can be either a real path or a simple model name. > > > > Yes , PredictRuntimeContext is more generic and is extendible. I will > > update the FLIP soon. > >>> There is a `set_model_config` in the python PredictFunction interface. > > I find that in the previous discussion, you are considering passing > > PredictRuntimeContext in `open`. I do agree with this choice as well, > which > > makes the logic more clear and uses can just use the configs in the > `open` > > method to init their model. I just find that the FLIP still has the > > `set_model_config` so just want to double check it here. > > > > -Thanks, > > M.Swapna > > > > > > On Tue, Dec 2, 2025 at 4:48 AM Geng Biao <[email protected] <mailto: > [email protected]>> wrote: > > > >> Hi folks, > >> > >> I am writing to follow up with this FLIP. I recently implemented a PoC > for > >> the ML_PREDICT using local python model based on current PyFlink’s udtf > >> framework. > >> Codes can be found here: > >> > https://github.com/bgeng777/flink/commit/8eb002944ee8ce863680923ff53622eb10813777 > >> I believe the proposal can work and want to share some findings: > >> My implementation follows most SQL section design of the FLIP (only > >> difference is that I use ‘model’ option instead of > >> 'model-directory-path’(see my previous reply of the thread for details), > >> which I think is expressive enough. > >> > >> In the python part, as the ML_PREDICT is a specialized UDTF and PyFlink > >> also supports UDTF, it is natural to directly reuse the udtf < > >> > https://github.com/apache/flink/blob/master/flink-python/pyflink/table/udf.py#L692 > > > >> decorator to define a python predict class. One point is that as the SQL > >> primitive has defined OUTPUT schema, we may need to enhance the > PyFlink’s > >> udtf interface to allow users to use it without specifying > `result_types` > >> and directly use the definition from the DDL. Another point is that we > need > >> to pass some model parameters(e.g. temperature, GPU/CPU settings) to > Python > >> process so we may need to enhance PyFlink to support such usage so that > in > >> the UDTF, users can directly get the model parameters. After introducing > >> such improvements, we can define the python logic like this: > >> > >> ``` > >> class HuggingFaceFunc(TableFunction): > >> def __init__(self): > >> self.model = None > >> self.tokenizer = None > >> self.pipeline = None > >> > >> def open(self, runtime_context): > >> model_dir = runtime_context.get_job_parameter("model", None) > >> ... > >> self.tokenizer = AutoTokenizer.from_pretrained( > >> model_dir, trust_remote_code=True > >> ) > >> > >> self.model = AutoModelForCausalLM.from_pretrained(model_dir, > >> device_map=device) > >> self.pipeline = pipeline( > >> "text-generation", model=self.model, tokenizer=self.tokenizer > >> ) > >> assert self.model is not None > >> > >> def eval(self, content: str, comment: str): > >> output = self.pipeline(content)[0]["generated_text"] > >> yield output, len(output) > >> > >> > >> HuggingFaceModelUDTF = udtf(HuggingFaceFunc()) > >> ``` > >> > >> I have implemented 2 python predict class, one uses HuggingFace and > >> another one is based on vLLM to verify this design(codes can be found > here < > >> > https://github.com/bgeng777/flink/blob/bgeng777/python-ml-predict-poc/flink-end-to-end-tests/flink-python-test/python/huggingface_udtf.py > >). > >> So to conclude, I am wondering if we really need a new Python `class > >> PredictFunction(TableFunction)`. > >> > >> Thanks for reading and looking forward to your thoughts. > >> > >> Best, > >> Biao Geng > >> > >> > >>> 2025年11月21日 19:28,Geng Biao <[email protected]> 写道: > >>> > >>> Hi Swapna, > >>> > >>> Thanks for the FLIP and sorry for the late reply. After reading it, I > >> have some comments about the proposal. > >>> > >>> 1. I notice that the parameter `model-directory-path` is introduced. It > >> should work but I see that some popular python model libraries like > >> `transformer` and `vllm`, tend to directly use the parameter name > `model` > >> which could be a local path or a simple name like `Qwen/Qwen3-0.6B`. > Then > >> they check if `model` is a path or a simple name and automatically > download > >> it to local cache directory if necessary. Considering use cases based > >> these libraries, maybe `model-directory-path` can be renamed to `model` > >> which can be either a real path or a simple model name. > >>> > >>> 2. There is a `set_model_config` in the python PredictFunction > >> interface. I find that in the previous discussion, you are considering > >> passing PredictRuntimeContext in `open`. I do agree with this choice as > >> well, which makes the logic more clear and uses can just use the > configs in > >> the `open` method to init their model. I just find that the FLIP still > has > >> the `set_model_config` so just want to double check it here. > >>> > >>> > >>> > >>> Thanks for your time! > >>> > >>> Best, > >>> Biao Geng > >>> > >>>> 2025年9月22日 23:36,Swapna Marru <[email protected]> 写道: > >>>> > >>>> Hi Devs, > >>>> > >>>> > >>>> I am interested in learning more about MODEL, ML_PREDICT, and > >> ML_EVALUATE > >>>> functionalities added in the following FLIP. > >>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL > >>>> > >>>> > >>>> I see the original FLIP has extensibility to local model providers in > >> Flink. > >>>> > >>>> > >>>> Is there a way to do pluggable local model providers in Python? Like, > >> say, > >>>> generate embeddings using Sentence transformer models running locally > in > >>>> Flink. > >>>> > >>>> > >>>> An option could be to introduce a Model Provider factory > implementation > >> in > >>>> Java that internally uses a predict function in Python . But I see > this > >>>> puts in a lot of work related to Java to Python > >> communication/translation > >>>> inside the provider. > >>>> > >>>> > >>>> Something like PythonRuntimeProvider along with > PredictRuntimeProvider / > >>>> AsyncRuntimeProvider which can handle Java -> Python translations out > of > >>>> the box would be helpful to de-duplicate that effort. > >>>> > >>>> > >>>> Can you please point to, if there are any discussions related to this > >>>> already ? Or any other ways to achieve the same? Please share your > >> thoughts. > >>>> > >>>> > >>>> -Thanks, > >>>> > >>>> Swapna Marru > >
