Hi Rahul, Thanks for the FLIP. I think leveraging the LLM services' batch API is pretty meaningful. But after reading the design doc, I want to check how users use the batch API with current design. For example, do we still use SQL(e.g CREATE MODEL with some new options the `WITH` clause in the doc's `public interfaces` section)? If that's the case, the Option2 seems to not output any result and an extra pipeline would be introduced to extract the final result, which makes it not a single SQL solution.
Best, Biao Geng Rahul Bhattacharya <[email protected]> 于2025年11月21日周五 10:38写道: > Sorry the example json did not include the original question in the > request, this is a better example . notice i have the original_question in > metadata and i am not using any custom_id. metadata has no field length > restrictions > > { > "method": "POST", > "url": "/v1/chat/completions", > "body": { > "model": "gpt-4.1-mini", > "metadata": { > "topic": "questions-topic", > "partition": 5, > "offset": 10223, > "original_question": "what is 2+5" > }, > "messages": [ > { "role": "user", "content": "what is 2+5" } > ] > } > } > > Sample response for above request > > { > "response": { > "metadata": { > "topic": "questions-topic", > "partition": 5, > "offset": 10223, > "original_question": "what is 2+5" > }, > "choices": [ > { "message": { "role": "assistant", "content": "2+5 = 7" } } > ] > } > } > > > On Thu, Nov 20, 2025 at 8:32 PM Rahul Bhattacharya <[email protected]> > wrote: > > > Hi Hao, > > the custom_id field is not mandatory. > > it's a field to match the answer to the question, but you can't use very > > long strings in custom_id. > > my suggestion will be to not pass any custom_id, but use the metadata > > field instead which does not have any restrictions. > > > > Sample jsonl request file > > > > { > > "method": "POST", > > "url": "/v1/chat/completions", > > "body": { > > "model": "gpt-4.1-mini", > > "metadata": { > > "topic": "questions-topic", > > "partition": 5, > > "offset": 10223, > > "row_id": 789 > > }, > > "messages": [ > > { "role": "user", "content": "what is 2+5" } > > ] > > } > > } > > > > Sample response for above request > > > > { > > "response": { > > "metadata": { > > "topic": "questions-topic", > > "partition": 5, > > "offset": 10223, > > "row_id": 789 > > }, > > "choices": [ > > { "message": { "role": "assistant", "content": "2+5 = 7" } } > > ] > > } > > } > > > > > > For the second question, the status of the batch is for the entire batch > > of records. Once the batch shows completed users can download the > response > > file which will have many lines/records like the above json > > > > > > regards > > Rahul > > > > On Thu, Nov 20, 2025 at 4:49 PM Hao Li via dev <[email protected]> > > wrote: > > > >> Hi Rahul, > >> > >> FYI, the FLIP process involves a voting process as well [1] before > >> implementation. I have two more question regarding sending batch id > >> downstream: > >> > >> 1. For openai batch api specifically, does it needs a `custom_id` for > each > >> request? Where is this custom_id from? > >> 2. If you batch several requests, they will get the same batch_id as the > >> response. Does downstream need to poll them for each request even though > >> they all have the same batch_id? > >> > >> [1] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Accepted > >> > >> Thanks, > >> Hao > >> > >> On Thu, Nov 20, 2025 at 2:09 PM Rahul Bhattacharya <[email protected] > > > >> wrote: > >> > >> > i created a Jira for this > >> > https://issues.apache.org/jira/browse/FLINK-38708 > >> > > >> > On Fri, Nov 14, 2025 at 2:04 PM Hao Li <[email protected]> > >> wrote: > >> > > >> > > Hi Shengkai, > >> > > > >> > > > process them together, and complete the future objects > sequentially > >> as > >> > > they finish > >> > > > >> > > The batch api Rahul proposed is [1] which sends batch request from > >> file, > >> > > return an id and we need to poll the id for results. The important > >> part > >> > is > >> > > it can take 24 hours to finish. So Rahua is proposing to just send > the > >> > > result id to downstream. > >> > > > >> > > [1] https://platform.openai.com/docs/guides/batch > >> > > > >> > > On Fri, Nov 14, 2025 at 8:35 AM Rahul Bhattacharya < > >> [email protected]> > >> > > wrote: > >> > > > >> > > > Hi Shengkai, > >> > > > so i am understanding we will go with option 1 and send the > batchid > >> > > > downstream to do whatever the user needs to do with the batchids? > >> > > > > >> > > > i also in the opinion that option 1 is a better option for now > than > >> > > option > >> > > > 2. > >> > > > Based on a parameter setting we should batch n records , create a > >> > > > jsonl file and post it to the LLM batch api. > >> > > > The LLM will immediately return the batch id which we can just > send > >> it > >> > > > downstream. this implementation will be stateless and really > simple > >> to > >> > > > implement > >> > > > > >> > > > > >> > > > regards > >> > > > > >> > > > > >> > > > > >> > > > On Fri, Nov 14, 2025 at 4:18 AM Shengkai Fang <[email protected]> > >> > wrote: > >> > > > > >> > > > > Hi. Rahul > >> > > > > > >> > > > > First, I believe we don't need to modify the framework. Instead, > >> we > >> > can > >> > > > > have the async predict function collect records in batches, > >> process > >> > > them > >> > > > > together, and complete the future objects sequentially as they > >> > finish. > >> > > > This > >> > > > > approach allows us to move forward quickly. > >> > > > > > >> > > > > Second, I have concerns about introducing state or external > >> storage. > >> > On > >> > > > one > >> > > > > hand, the current design is stateless, and transitioning to a > >> > stateful > >> > > > > architecture would require significant refactoring. On the other > >> > hand, > >> > > I > >> > > > > don't see clear advantages to storing batch IDs in state, since > we > >> > > cannot > >> > > > > guarantee that elements will arrive in the same order after > >> restoring > >> > > > from > >> > > > > a checkpoint. For example, if the ML predictor receives elements > >> e1, > >> > > e2, > >> > > > e3 > >> > > > > in the first run, it might receive e2, e3, e1 after recovery. > >> With a > >> > > > batch > >> > > > > size of 2, we wouldn't be able to reuse the in-flight requests > >> > > > effectively. > >> > > > > > >> > > > > Finally, I suggest we leverage the IOManager to handle JSON file > >> > > > > management. > >> > > > > > >> > > > > Best, > >> > > > > Shengkai > >> > > > > > >> > > > > > >> > > > > Rahul Bhattacharya <[email protected]> 于2025年11月11日周二 > 09:03写道: > >> > > > > > >> > > > > > Hi Hao > >> > > > > > For option 2 the guarantees like exactly once and other things > >> are > >> > > not > >> > > > > > guaranteed as it’s not in the same flink process > >> > > > > > > >> > > > > > The flink process finishes after the submission and getting > >> > batchids > >> > > > > which > >> > > > > > it sends downstream > >> > > > > > > >> > > > > > There has to be another process(doesn’t have to be flink) > which > >> > > takes > >> > > > > > these batchids and polls the OpenAI endpoint for status > >> completed. > >> > > > > > > >> > > > > > Once it gets completed it downloads the results and sends > >> > downstream > >> > > > > > > >> > > > > > This secondary process is on client discretion , for Kafka > >> > probably a > >> > > > > http > >> > > > > > sink connector or Kafka consumer > >> > > > > > > >> > > > > > Thanks And Regards > >> > > > > > Rahul > >> > > > > > > >> > > > > > > >> > > > > > On Mon, Nov 10, 2025 at 6:45 PM Hao Li > <[email protected] > >> > > >> > > > wrote: > >> > > > > > > >> > > > > > > Hi Rahul, > >> > > > > > > > >> > > > > > > Thanks for the proposal. From some offline discussion, the > >> > endpoint > >> > > > you > >> > > > > > > have in mind to support is OpenAI batch API [1]. The doc > >> states > >> > > that > >> > > > > > "Each > >> > > > > > > batch completes within 24 hours (and often more quickly)". > >> With > >> > > this > >> > > > > > > context, I have some questions: > >> > > > > > > > >> > > > > > > 1. For design option 1, does the operator always wait for > >> batch > >> > > > > response > >> > > > > > > until processing next batch? This can take 24 hours which > >> isn't > >> > > > > feasible > >> > > > > > > for streaming job I think. > >> > > > > > > > >> > > > > > > 2. For design option 2, why it loses exact-once and have > >> higher > >> > > > latency > >> > > > > > > compared to 1? > >> > > > > > > > >> > > > > > > 3. Also for the public interface section, are the parameters > >> in > >> > > > > > > `ml_predict` config or in options when `create model`? > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > Hao > >> > > > > > > > >> > > > > > > > >> > > > > > > [1] https://platform.openai.com/docs/guides/batch/batch-api > >> > > > > > > > >> > > > > > > On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera < > >> > > > > [email protected]> > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > > > +1 > >> > > > > > > > > >> > > > > > > > This proposal is needed for optimizing network calls and > >> > > processing > >> > > > > > asyc. > >> > > > > > > > > >> > > > > > > > Thanks > >> > > > > > > > Asimansu > >> > > > > > > > > >> > > > > > > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang < > >> > [email protected] > >> > > > > >> > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi, Rahul. > >> > > > > > > > > > >> > > > > > > > > +1 for this proposal. However, due to my current > workload, > >> > I'll > >> > > > > need > >> > > > > > > > until > >> > > > > > > > > the end of this week to review it thoroughly. > >> > > > > > > > > > >> > > > > > > > > Best, > >> > > > > > > > > Shengkai > >> > > > > > > > > > >> > > > > > > > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日 > >> > > 13:08写道: > >> > > > > > > > > > >> > > > > > > > > > Hi , > >> > > > > > > > > > i have created a draft FLIP for it > >> > > > > > > > > > > >> > > > > > > > >> > > > > >> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/ > >> > > > > > > > > > > >> > > > > > > > > > Please let me know your thoughts > >> > > > > > > > > > > >> > > > > > > > > > regards > >> > > > > > > > > > Rahul > >> > > > > > > > > > > >> > > > > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya < > >> > > > > > > [email protected] > >> > > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi, > >> > > > > > > > > > > I actually thought of reworking my previous > response. > >> I > >> > > want > >> > > > > the > >> > > > > > > > table > >> > > > > > > > > > api > >> > > > > > > > > > > to create jsonl files and call openai/claude batch > >> apis. > >> > > > > > > > > > > The implementation I am doing is going to batch the > >> > records > >> > > > > into > >> > > > > > a > >> > > > > > > > file > >> > > > > > > > > > > and call the api with the file and then continuously > >> poll > >> > > the > >> > > > > > > repose > >> > > > > > > > to > >> > > > > > > > > > see > >> > > > > > > > > > > the status of the batch and then use that to write > the > >> > > > response > >> > > > > > > > > records. > >> > > > > > > > > > > The ML_Predict in its current form is not usable as > >> > people > >> > > > are > >> > > > > > not > >> > > > > > > > > > looking > >> > > > > > > > > > > for synchronous response which is twice as expensive > >> as > >> > the > >> > > > > > > > > asynchronous > >> > > > > > > > > > > response. > >> > > > > > > > > > > let me know you thoughts and i can create a FLIP for > >> it > >> > > > > > > > > > > regards > >> > > > > > > > > > > > >> > > > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya < > >> > > > > > > > [email protected] > >> > > > > > > > > > > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > >> Hi Flink Community, > >> > > > > > > > > > >> > >> > > > > > > > > > >> I'm interested in contributing an enhancement to > >> Apache > >> > > > > Flink's > >> > > > > > > > > > >> ML_PREDICT > >> > > > > > > > > > >> functionality for LLM interactions. I'd like to > gauge > >> > > > > community > >> > > > > > > > > interest > >> > > > > > > > > > >> and get > >> > > > > > > > > > >> early feedback before proceeding with detailed > design > >> > or a > >> > > > > FLIP. > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Problem Statement > >> > > > > > > > > > >> > >> > > > > > > > > > >> Currently, when using Flink SQL's ML_PREDICT with > LLM > >> > > > > endpoints, > >> > > > > > > > each > >> > > > > > > > > > >> record > >> > > > > > > > > > >> triggers an individual API call. For a stream > >> processing > >> > > > 1000 > >> > > > > > > > > > >> records/second, > >> > > > > > > > > > >> this results in: > >> > > > > > > > > > >> > >> > > > > > > > > > >> - **1000 separate API calls per second** > >> > > > > > > > > > >> - **High latency**: Each call has network overhead > + > >> API > >> > > > > > > processing > >> > > > > > > > > time > >> > > > > > > > > > >> - **High cost**: Most LLM providers charge per > token, > >> > and > >> > > > lack > >> > > > > > of > >> > > > > > > > > > >> batching means > >> > > > > > > > > > >> no cost optimization > >> > > > > > > > > > >> - **Rate limiting issues**: Hitting provider rate > >> limits > >> > > > > quickly > >> > > > > > > > > > >> - **Poor throughput**: API calls are serialized per > >> > record > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Current Behavior (Inefficient) > >> > > > > > > > > > >> ```sql > >> > > > > > > > > > >> -- This makes 10 individual API calls > >> > > > > > > > > > >> SELECT id, ML_PREDICT('llm_model', text) as result > >> > > > > > > > > > >> FROM (VALUES > >> > > > > > > > > > >> (1, 'text1'), (2, 'text2'), ..., (10, 'text10') > >> > > > > > > > > > >> ) AS t(id, text); > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> **Result**: 10 separate HTTP requests, 10x latency, > >> 10x > >> > > > > overhead > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Proposed Solution: Application-Level Batching > with > >> > > Prompt > >> > > > > > > > > Engineering > >> > > > > > > > > > >> > >> > > > > > > > > > >> Since most LLM APIs (OpenAI, Anthropic Claude, > etc.) > >> > don't > >> > > > > > provide > >> > > > > > > > > > native > >> > > > > > > > > > >> batch > >> > > > > > > > > > >> endpoints, we propose implementing batching at the > >> > > > application > >> > > > > > > level > >> > > > > > > > > by: > >> > > > > > > > > > >> > >> > > > > > > > > > >> 1. **Accumulating N records** into a single batch > >> > > > > > > > > > >> 2. **Injecting records into a structured prompt** > >> that > >> > > > > instructs > >> > > > > > > the > >> > > > > > > > > LLM > >> > > > > > > > > > >> to > >> > > > > > > > > > >> process multiple items > >> > > > > > > > > > >> 3. **Parsing structured responses** to extract > >> results > >> > for > >> > > > > each > >> > > > > > > > record > >> > > > > > > > > > >> 4. **Emitting individual results** back to the > Flink > >> > > > pipeline > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### How It Works > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Step 1: Batch Accumulation** > >> > > > > > > > > > >> Collect up to `batch.size` records or wait up to ` > >> > > > > > > batch.timeout.ms` > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Step 2: Prompt Construction** > >> > > > > > > > > > >> > >> > > > > > > > > > >> System: You are a sentiment analyzer. Process each > >> item > >> > > and > >> > > > > > > respond > >> > > > > > > > > with > >> > > > > > > > > > >> JSON. > >> > > > > > > > > > >> > >> > > > > > > > > > >> User: Analyze the sentiment of these texts. Return > a > >> > JSON > >> > > > > array > >> > > > > > > with > >> > > > > > > > > one > >> > > > > > > > > > >> object per input containing "index" and "sentiment" > >> > > fields. > >> > > > > > > > > > >> > >> > > > > > > > > > >> Input 1: "This product is amazing!" Input 2: > >> "Terrible > >> > > > > > experience, > >> > > > > > > > > very > >> > > > > > > > > > >> disappointed" Input 3: "It's okay, nothing special" > >> ... > >> > > > Input > >> > > > > > 10: > >> > > > > > > > > "Best > >> > > > > > > > > > >> purchase ever!" > >> > > > > > > > > > >> > >> > > > > > > > > > >> Respond with: [{"index": 1, "sentiment": "..."}, > >> > {"index": > >> > > > 2, > >> > > > > > > > > > >> "sentiment": "..."}, ...] > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Step 3: Response Parsing** > >> > > > > > > > > > >> ```json > >> > > > > > > > > > >> [ > >> > > > > > > > > > >> {"index": 1, "sentiment": "positive"}, > >> > > > > > > > > > >> {"index": 2, "sentiment": "negative"}, > >> > > > > > > > > > >> {"index": 3, "sentiment": "neutral"}, > >> > > > > > > > > > >> ... > >> > > > > > > > > > >> {"index": 10, "sentiment": "positive"} > >> > > > > > > > > > >> ] > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Step 4: Result Distribution** > >> > > > > > > > > > >> Parse JSON and emit individual results back to > >> > > corresponding > >> > > > > > > records > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Model Configuration (Defaults) > >> > > > > > > > > > >> ```sql > >> > > > > > > > > > >> CREATE MODEL llm_sentiment WITH ( > >> > > > > > > > > > >> 'provider' = 'openai', > >> > > > > > > > > > >> 'model' = 'gpt-4', > >> > > > > > > > > > >> 'api_key' = '${API_KEY}', > >> > > > > > > > > > >> 'batch.size' = '20', > >> > > > > > > > > > >> 'batch.timeout.ms' = '1000', > >> > > > > > > > > > >> 'system.prompt' = 'You are a sentiment > analyzer. > >> > > Always > >> > > > > > > respond > >> > > > > > > > > with > >> > > > > > > > > > >> valid JSON.', > >> > > > > > > > > > >> 'batch.prompt.template' = 'Analyze sentiment > for > >> > these > >> > > > > > texts. > >> > > > > > > > > Return > >> > > > > > > > > > >> JSON array: [{"index": <n>, "sentiment": > >> > > > > > > > > > "<positive|negative|neutral>"}]', > >> > > > > > > > > > >> 'response.format' = 'json', > >> > > > > > > > > > >> 'response.path' = '$[*]', -- JSONPath to > extract > >> > > array > >> > > > of > >> > > > > > > > results > >> > > > > > > > > > >> 'response.index.field' = 'index', -- Field > >> > containing > >> > > > > > record > >> > > > > > > > > index > >> > > > > > > > > > >> 'response.value.field' = 'sentiment' -- Field > >> > > > containing > >> > > > > > > result > >> > > > > > > > > > >> ); > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Query Usage (Use Defaults) > >> > > > > > > > > > >> ```sql > >> > > > > > > > > > >> -- Uses batch_size=20 from model definition > >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) > as > >> > > > > sentiment > >> > > > > > > > > > >> FROM customer_reviews; > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Query Usage (Override for Custom Analysis) > >> > > > > > > > > > >> ```sql > >> > > > > > > > > > >> -- Override prompt and batch size for different use > >> case > >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text, > >> > > > > > > > > > >> MAP['batch.size', '50', > >> > > > > > > > > > >> 'batch.prompt.template', 'Extract key > >> entities. > >> > > > Return > >> > > > > > > JSON: > >> > > > > > > > > > >> [{"index": <n>, "entities": [...]}]', > >> > > > > > > > > > >> 'response.value.field', 'entities']) as > >> entities > >> > > > > > > > > > >> FROM documents; > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Performance and Cost Impact > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Example: Processing 10,000 customer reviews > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Current (unbatched)**: > >> > > > > > > > > > >> - 10,000 API calls > >> > > > > > > > > > >> - ~10,000 x 200ms latency = 2,000 seconds total > >> > processing > >> > > > > time > >> > > > > > > > > > >> (serialized) > >> > > > > > > > > > >> - ~10,000 x $0.002 = $20 in API costs > >> > > > > > > > > > >> - High rate limit pressure > >> > > > > > > > > > >> > >> > > > > > > > > > >> **With batching (batch_size=20)**: > >> > > > > > > > > > >> - 500 API calls (10,000 / 20) > >> > > > > > > > > > >> - ~500 x 300ms latency = 150 seconds total > processing > >> > time > >> > > > > > > > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher > >> per > >> > > call > >> > > > > due > >> > > > > > to > >> > > > > > > > > > larger > >> > > > > > > > > > >> prompts, > >> > > > > > > > > > >> but still 85% cheaper overall) > >> > > > > > > > > > >> - **20x fewer API calls** > >> > > > > > > > > > >> - **13x faster processing** > >> > > > > > > > > > >> - **85% cost reduction** > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Proposed Implementation > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Configuration Parameters > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Model-level (defaults)**: > >> > > > > > > > > > >> - `batch.size`: Maximum records per batch > (default: 1 > >> > for > >> > > > > > backward > >> > > > > > > > > > >> compatibility) > >> > > > > > > > > > >> - `batch.timeout.ms`: Max time to wait before > >> flushing > >> > > > > > incomplete > >> > > > > > > > > batch > >> > > > > > > > > > >> (default: 1000ms) > >> > > > > > > > > > >> - `system.prompt`: System-level instruction for the > >> LLM > >> > > > > > > > > > >> - `batch.prompt.template`: Template explaining how > to > >> > > > process > >> > > > > > > > batched > >> > > > > > > > > > >> inputs > >> > > > > > > > > > >> - `response.format`: Expected response format > >> ('json', > >> > > > 'xml', > >> > > > > > > > > > 'delimited') > >> > > > > > > > > > >> - `response.path`: JSONPath or XPath to extract > >> results > >> > > > array > >> > > > > > > > > > >> - `response.index.field`: Field name containing the > >> > record > >> > > > > index > >> > > > > > > > > > >> - `response.value.field`: Field name containing the > >> > actual > >> > > > > > result > >> > > > > > > > > > >> - `max.retries`: Retry attempts for failed batches > >> > > (default: > >> > > > > 3) > >> > > > > > > > > > >> - `request.timeout.ms`: Timeout for API calls > >> (default: > >> > > > > > 30000ms) > >> > > > > > > > > > >> > >> > > > > > > > > > >> **Query-level (overrides)**: > >> > > > > > > > > > >> - Any of the above can be overridden via MAP > >> parameter > >> > in > >> > > > > > > ML_PREDICT > >> > > > > > > > > > >> - Per-query customization for different analysis > >> tasks > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Key Features > >> > > > > > > > > > >> 1. **Prompt injection**: Automatically construct > >> batch > >> > > > prompts > >> > > > > > > with > >> > > > > > > > > > >> indexed inputs > >> > > > > > > > > > >> 2. **Structured response parsing**: Support JSON, > >> XML, > >> > or > >> > > > > > > delimited > >> > > > > > > > > > >> formats > >> > > > > > > > > > >> 3. **Index tracking**: Maintain record-to-result > >> mapping > >> > > > > through > >> > > > > > > the > >> > > > > > > > > > batch > >> > > > > > > > > > >> 4. **Error handling**: Handle parsing failures, > >> missing > >> > > > > indices, > >> > > > > > > > > > >> malformed responses > >> > > > > > > > > > >> 5. **Fallback to individual calls**: If batch > fails, > >> > > > > optionally > >> > > > > > > > retry > >> > > > > > > > > > >> records individually > >> > > > > > > > > > >> 6. **Provider-agnostic**: Works with any LLM API > >> > (OpenAI, > >> > > > > > > Anthropic, > >> > > > > > > > > > >> Azure, self-hosted) > >> > > > > > > > > > >> 7. **Async processing**: Non-blocking batch > requests > >> > > > > > > > > > >> 8. **Back-pressure**: Proper flow control when API > is > >> > slow > >> > > > > > > > > > >> 9. **Backward compatible**: batch.size=1 maintains > >> > current > >> > > > > > > behavior > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Technical Approach > >> > > > > > > > > > >> - Extend existing ML_PREDICT infrastructure > >> > > > > > > > > > >> - Add batching buffer in the ML_PREDICT operator > >> > > > > > > > > > >> - Implement prompt template engine for batch > >> > construction: > >> > > > > > > > > > >> - Inject record index + content into template > >> > > > > > > > > > >> - Support various templating formats (JSON, XML, > >> plain > >> > > > text) > >> > > > > > > > > > >> - Implement response parser: > >> > > > > > > > > > >> - Extract structured data (JSONPath, XPath, > regex) > >> > > > > > > > > > >> - Map results back to original records by index > >> > > > > > > > > > >> - Handle missing or malformed responses > >> > > > > > > > > > >> - Maintain record ordering and error attribution > >> > > > > > > > > > >> - Support parameter override mechanism in > ML_PREDICT > >> > > > function > >> > > > > > > > > signature > >> > > > > > > > > > >> > >> > > > > > > > > > >> ### Response Parsing Strategy > >> > > > > > > > > > >> > >> > > > > > > > > > >> The implementation must handle: > >> > > > > > > > > > >> 1. **Successful batch response**: Parse and > >> distribute > >> > > > results > >> > > > > > > > > > >> 2. **Partial failure**: Some records missing from > >> > > response → > >> > > > > > emit > >> > > > > > > > > errors > >> > > > > > > > > > >> for those > >> > > > > > > > > > >> 3. **Complete parse failure**: Optionally fallback > to > >> > > > > individual > >> > > > > > > > calls > >> > > > > > > > > > >> 4. **Index mismatch**: Response indices don't match > >> > input > >> > > → > >> > > > > log > >> > > > > > > > > warning > >> > > > > > > > > > >> and best-effort match > >> > > > > > > > > > >> 5. **Malformed JSON**: Retry with error handling > >> > > > > > > > > > >> > >> > > > > > > > > > >> Example error handling: > >> > > > > > > > > > >> ```sql > >> > > > > > > > > > >> -- Records that fail parsing get null results with > >> error > >> > > > > > metadata > >> > > > > > > > > > >> SELECT > >> > > > > > > > > > >> id, > >> > > > > > > > > > >> text, > >> > > > > > > > > > >> result.value as sentiment, > >> > > > > > > > > > >> result.error as error_msg > >> > > > > > > > > > >> FROM source_table, > >> > > > > > > > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text)); > >> > > > > > > > > > >> ``` > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Limitations and Considerations > >> > > > > > > > > > >> > >> > > > > > > > > > >> 1. **LLM instruction following**: Depends on > model's > >> > > ability > >> > > > > to > >> > > > > > > > follow > >> > > > > > > > > > >> structured > >> > > > > > > > > > >> output instructions. GPT-4 and Claude are > >> reliable; > >> > > older > >> > > > > > > models > >> > > > > > > > > may > >> > > > > > > > > > >> struggle. > >> > > > > > > > > > >> > >> > > > > > > > > > >> 2. **Prompt size limits**: Batching too many > records > >> may > >> > > > > exceed > >> > > > > > > > > context > >> > > > > > > > > > >> windows > >> > > > > > > > > > >> - GPT-4: ~8K tokens input limit > >> > > > > > > > > > >> - Claude: ~200K tokens but practical batches > >> smaller > >> > > > > > > > > > >> - Need configurable max batch size based on > >> average > >> > > > record > >> > > > > > > length > >> > > > > > > > > > >> > >> > > > > > > > > > >> 3. **Token cost trade-off**: Larger batches mean: > >> > > > > > > > > > >> - Fewer API calls (good) > >> > > > > > > > > > >> - But larger prompts with > instructions/formatting > >> > > (slight > >> > > > > > > > overhead) > >> > > > > > > > > > >> - Net savings still 80-90% in practice > >> > > > > > > > > > >> > >> > > > > > > > > > >> 4. **Parsing reliability**: Small risk of malformed > >> > > > responses > >> > > > > > > > > > >> - Mitigated by: clear instructions, JSON mode > >> > (GPT-4), > >> > > > > retry > >> > > > > > > > logic > >> > > > > > > > > > >> - Fallback to individual calls if batch parsing > >> fails > >> > > > > > > repeatedly > >> > > > > > > > > > >> > >> > > > > > > > > > >> 5. **Latency characteristics**: > >> > > > > > > > > > >> - Individual records see slightly higher latency > >> > > (waiting > >> > > > > for > >> > > > > > > > > batch) > >> > > > > > > > > > >> - Overall throughput dramatically improved > >> > > > > > > > > > >> - Use `batch.timeout.ms` to balance latency vs > >> > > > throughput > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Future Extensions > >> > > > > > > > > > >> > >> > > > > > > > > > >> This batching architecture would support: > >> > > > > > > > > > >> 1. **Stateful chat sessions**: Batch multiple turns > >> of a > >> > > > > > > > conversation > >> > > > > > > > > > >> with > >> > > > > > > > > > >> maintained history per session key > >> > > > > > > > > > >> 2. **Embedding generation**: Some providers > (OpenAI) > >> do > >> > > have > >> > > > > > batch > >> > > > > > > > > > >> embedding APIs > >> > > > > > > > > > >> 3. **Multi-modal batching**: Batch image + text > >> > processing > >> > > > > with > >> > > > > > > > > > >> structured outputs > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Questions for the Community > >> > > > > > > > > > >> > >> > > > > > > > > > >> 1. **Architecture**: Should this extend ML_PREDICT > or > >> > be a > >> > > > new > >> > > > > > > > > function? > >> > > > > > > > > > >> (I propose extending ML_PREDICT for backward > >> > > > compatibility) > >> > > > > > > > > > >> > >> > > > > > > > > > >> 2. **FLIP Required?**: Does this enhancement > warrant > >> a > >> > > FLIP? > >> > > > > > > > > > >> > >> > > > > > > > > > >> 3. **Existing Work**: Is anyone working on batching > >> for > >> > > > > > ML_PREDICT > >> > > > > > > > or > >> > > > > > > > > > >> similar > >> > > > > > > > > > >> functionality? > >> > > > > > > > > > >> > >> > > > > > > > > > >> 4. **Prompt Template Engine**: Should we: > >> > > > > > > > > > >> - Build a custom template engine? > >> > > > > > > > > > >> - Use existing library (e.g., StringTemplate, > >> > > Mustache)? > >> > > > > > > > > > >> - Keep it simple with String.format initially? > >> > > > > > > > > > >> > >> > > > > > > > > > >> 5. **Response Parsing**: Preferred approach: > >> > > > > > > > > > >> - JSONPath library (flexible but adds > dependency) > >> > > > > > > > > > >> - Simple JSON parsing with field names > >> > > > > > > > > > >> - Pluggable parser interface for extensibility? > >> > > > > > > > > > >> > >> > > > > > > > > > >> 6. **Error Handling**: If parsing fails for entire > >> > batch: > >> > > > > > > > > > >> - Fail all records in batch? > >> > > > > > > > > > >> - Retry batch once more? > >> > > > > > > > > > >> - Fallback to individual calls (with circuit > >> > breaker)? > >> > > > > > > > > > >> - Make strategy configurable? > >> > > > > > > > > > >> > >> > > > > > > > > > >> 7. **Batch Assembly**: Should batching happen: > >> > > > > > > > > > >> - Per parallel instance (each task maintains its > >> own > >> > > > > batch)? > >> > > > > > > > > > >> - Globally coordinated (shuffle to batch > >> > coordinator)? > >> > > > > > > > > > >> - I propose per-instance for simplicity and > lower > >> > > latency > >> > > > > > > > > > >> > >> > > > > > > > > > >> 8. **Compatibility**: Default batch.size=1 to > >> maintain > >> > > > current > >> > > > > > > > > behavior, > >> > > > > > > > > > >> users > >> > > > > > > > > > >> opt-in to batching? > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Why This Matters > >> > > > > > > > > > >> > >> > > > > > > > > > >> LLM inference is becoming a critical part of > >> real-time > >> > > data > >> > > > > > > > pipelines. > >> > > > > > > > > > >> Without > >> > > > > > > > > > >> batching: > >> > > > > > > > > > >> - Users face prohibitive costs for high-throughput > >> > > workloads > >> > > > > > > > > > >> - Rate limits block production deployments > >> > > > > > > > > > >> - Latency makes real-time processing impractical > >> > > > > > > > > > >> > >> > > > > > > > > > >> While LLM providers don't offer native batch APIs, > >> > > > > > > application-level > >> > > > > > > > > > >> batching > >> > > > > > > > > > >> through prompt engineering is a proven pattern used > >> in > >> > > > > > production > >> > > > > > > by > >> > > > > > > > > > many > >> > > > > > > > > > >> organizations. This proposal brings that capability > >> > > natively > >> > > > > > into > >> > > > > > > > > Flink. > >> > > > > > > > > > >> > >> > > > > > > > > > >> The hybrid configuration approach provides: > >> > > > > > > > > > >> - **Sensible defaults** for common use cases > >> (sentiment > >> > > > > > analysis, > >> > > > > > > > > > >> classification) > >> > > > > > > > > > >> - **Flexibility** to customize prompts and parsing > >> for > >> > > > > specific > >> > > > > > > > needs > >> > > > > > > > > > >> - **Easy migration** for existing queries > >> (batch.size=1 > >> > > > > default) > >> > > > > > > > > > >> > >> > > > > > > > > > >> ## Next Steps > >> > > > > > > > > > >> > >> > > > > > > > > > >> If there's interest from the community, I'm happy > to: > >> > > > > > > > > > >> 1. Prepare a detailed design document with prompt > >> > > templates > >> > > > > and > >> > > > > > > > > parsing > >> > > > > > > > > > >> examples > >> > > > > > > > > > >> 2. Create a JIRA ticket > >> > > > > > > > > > >> 3. Develop a prototype demonstrating the batching > and > >> > > > parsing > >> > > > > > > logic > >> > > > > > > > > > >> 4. Write a FLIP if required > >> > > > > > > > > > >> > >> > > > > > > > > > >> Looking forward to your feedback and guidance on > how > >> > best > >> > > to > >> > > > > > > > > proceed!-- > >> > > > > > > > > > >> Thanks And Regards > >> > > > > > > > > > >> Rahul > >> > > > > > > > > > >> > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > -- > >> > > > > > > > > > > Thanks And Regards > >> > > > > > > > > > > Rahul > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > -- > >> > > > > > > > > > Thanks And Regards > >> > > > > > > > > > Rahul > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Thanks And Regards > >> > > > Rahul > >> > > > > >> > > > >> > > >> > > >> > -- > >> > Thanks And Regards > >> > Rahul > >> > > >> > > > > > > -- > > Thanks And Regards > > Rahul > > > > > -- > Thanks And Regards > Rahul >
