Hi , i have created a draft FLIP for it https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
Please let me know your thoughts regards Rahul On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected]> wrote: > Hi, > I actually thought of reworking my previous response. I want the table api > to create jsonl files and call openai/claude batch apis. > The implementation I am doing is going to batch the records into a file > and call the api with the file and then continuously poll the repose to see > the status of the batch and then use that to write the response records. > The ML_Predict in its current form is not usable as people are not looking > for synchronous response which is twice as expensive as the asynchronous > response. > let me know you thoughts and i can create a FLIP for it > regards > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]> > wrote: > >> Hi Flink Community, >> >> I'm interested in contributing an enhancement to Apache Flink's >> ML_PREDICT >> functionality for LLM interactions. I'd like to gauge community interest >> and get >> early feedback before proceeding with detailed design or a FLIP. >> >> ## Problem Statement >> >> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each >> record >> triggers an individual API call. For a stream processing 1000 >> records/second, >> this results in: >> >> - **1000 separate API calls per second** >> - **High latency**: Each call has network overhead + API processing time >> - **High cost**: Most LLM providers charge per token, and lack of >> batching means >> no cost optimization >> - **Rate limiting issues**: Hitting provider rate limits quickly >> - **Poor throughput**: API calls are serialized per record >> >> ### Current Behavior (Inefficient) >> ```sql >> -- This makes 10 individual API calls >> SELECT id, ML_PREDICT('llm_model', text) as result >> FROM (VALUES >> (1, 'text1'), (2, 'text2'), ..., (10, 'text10') >> ) AS t(id, text); >> ``` >> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead >> >> ## Proposed Solution: Application-Level Batching with Prompt Engineering >> >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native >> batch >> endpoints, we propose implementing batching at the application level by: >> >> 1. **Accumulating N records** into a single batch >> 2. **Injecting records into a structured prompt** that instructs the LLM >> to >> process multiple items >> 3. **Parsing structured responses** to extract results for each record >> 4. **Emitting individual results** back to the Flink pipeline >> >> ### How It Works >> >> **Step 1: Batch Accumulation** >> Collect up to `batch.size` records or wait up to `batch.timeout.ms` >> >> **Step 2: Prompt Construction** >> >> System: You are a sentiment analyzer. Process each item and respond with >> JSON. >> >> User: Analyze the sentiment of these texts. Return a JSON array with one >> object per input containing "index" and "sentiment" fields. >> >> Input 1: "This product is amazing!" Input 2: "Terrible experience, very >> disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best >> purchase ever!" >> >> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2, >> "sentiment": "..."}, ...] >> >> **Step 3: Response Parsing** >> ```json >> [ >> {"index": 1, "sentiment": "positive"}, >> {"index": 2, "sentiment": "negative"}, >> {"index": 3, "sentiment": "neutral"}, >> ... >> {"index": 10, "sentiment": "positive"} >> ] >> ``` >> >> **Step 4: Result Distribution** >> Parse JSON and emit individual results back to corresponding records >> >> ### Model Configuration (Defaults) >> ```sql >> CREATE MODEL llm_sentiment WITH ( >> 'provider' = 'openai', >> 'model' = 'gpt-4', >> 'api_key' = '${API_KEY}', >> 'batch.size' = '20', >> 'batch.timeout.ms' = '1000', >> 'system.prompt' = 'You are a sentiment analyzer. Always respond with >> valid JSON.', >> 'batch.prompt.template' = 'Analyze sentiment for these texts. Return >> JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]', >> 'response.format' = 'json', >> 'response.path' = '$[*]', -- JSONPath to extract array of results >> 'response.index.field' = 'index', -- Field containing record index >> 'response.value.field' = 'sentiment' -- Field containing result >> ); >> ``` >> >> ### Query Usage (Use Defaults) >> ```sql >> -- Uses batch_size=20 from model definition >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment >> FROM customer_reviews; >> ``` >> >> ### Query Usage (Override for Custom Analysis) >> ```sql >> -- Override prompt and batch size for different use case >> SELECT id, text, ML_PREDICT('llm_sentiment', text, >> MAP['batch.size', '50', >> 'batch.prompt.template', 'Extract key entities. Return JSON: >> [{"index": <n>, "entities": [...]}]', >> 'response.value.field', 'entities']) as entities >> FROM documents; >> ``` >> >> ## Performance and Cost Impact >> >> ### Example: Processing 10,000 customer reviews >> >> **Current (unbatched)**: >> - 10,000 API calls >> - ~10,000 x 200ms latency = 2,000 seconds total processing time >> (serialized) >> - ~10,000 x $0.002 = $20 in API costs >> - High rate limit pressure >> >> **With batching (batch_size=20)**: >> - 500 API calls (10,000 / 20) >> - ~500 x 300ms latency = 150 seconds total processing time >> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger >> prompts, >> but still 85% cheaper overall) >> - **20x fewer API calls** >> - **13x faster processing** >> - **85% cost reduction** >> >> ## Proposed Implementation >> >> ### Configuration Parameters >> >> **Model-level (defaults)**: >> - `batch.size`: Maximum records per batch (default: 1 for backward >> compatibility) >> - `batch.timeout.ms`: Max time to wait before flushing incomplete batch >> (default: 1000ms) >> - `system.prompt`: System-level instruction for the LLM >> - `batch.prompt.template`: Template explaining how to process batched >> inputs >> - `response.format`: Expected response format ('json', 'xml', 'delimited') >> - `response.path`: JSONPath or XPath to extract results array >> - `response.index.field`: Field name containing the record index >> - `response.value.field`: Field name containing the actual result >> - `max.retries`: Retry attempts for failed batches (default: 3) >> - `request.timeout.ms`: Timeout for API calls (default: 30000ms) >> >> **Query-level (overrides)**: >> - Any of the above can be overridden via MAP parameter in ML_PREDICT >> - Per-query customization for different analysis tasks >> >> ### Key Features >> 1. **Prompt injection**: Automatically construct batch prompts with >> indexed inputs >> 2. **Structured response parsing**: Support JSON, XML, or delimited >> formats >> 3. **Index tracking**: Maintain record-to-result mapping through the batch >> 4. **Error handling**: Handle parsing failures, missing indices, >> malformed responses >> 5. **Fallback to individual calls**: If batch fails, optionally retry >> records individually >> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic, >> Azure, self-hosted) >> 7. **Async processing**: Non-blocking batch requests >> 8. **Back-pressure**: Proper flow control when API is slow >> 9. **Backward compatible**: batch.size=1 maintains current behavior >> >> ### Technical Approach >> - Extend existing ML_PREDICT infrastructure >> - Add batching buffer in the ML_PREDICT operator >> - Implement prompt template engine for batch construction: >> - Inject record index + content into template >> - Support various templating formats (JSON, XML, plain text) >> - Implement response parser: >> - Extract structured data (JSONPath, XPath, regex) >> - Map results back to original records by index >> - Handle missing or malformed responses >> - Maintain record ordering and error attribution >> - Support parameter override mechanism in ML_PREDICT function signature >> >> ### Response Parsing Strategy >> >> The implementation must handle: >> 1. **Successful batch response**: Parse and distribute results >> 2. **Partial failure**: Some records missing from response → emit errors >> for those >> 3. **Complete parse failure**: Optionally fallback to individual calls >> 4. **Index mismatch**: Response indices don't match input → log warning >> and best-effort match >> 5. **Malformed JSON**: Retry with error handling >> >> Example error handling: >> ```sql >> -- Records that fail parsing get null results with error metadata >> SELECT >> id, >> text, >> result.value as sentiment, >> result.error as error_msg >> FROM source_table, >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text)); >> ``` >> >> ## Limitations and Considerations >> >> 1. **LLM instruction following**: Depends on model's ability to follow >> structured >> output instructions. GPT-4 and Claude are reliable; older models may >> struggle. >> >> 2. **Prompt size limits**: Batching too many records may exceed context >> windows >> - GPT-4: ~8K tokens input limit >> - Claude: ~200K tokens but practical batches smaller >> - Need configurable max batch size based on average record length >> >> 3. **Token cost trade-off**: Larger batches mean: >> - Fewer API calls (good) >> - But larger prompts with instructions/formatting (slight overhead) >> - Net savings still 80-90% in practice >> >> 4. **Parsing reliability**: Small risk of malformed responses >> - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic >> - Fallback to individual calls if batch parsing fails repeatedly >> >> 5. **Latency characteristics**: >> - Individual records see slightly higher latency (waiting for batch) >> - Overall throughput dramatically improved >> - Use `batch.timeout.ms` to balance latency vs throughput >> >> ## Future Extensions >> >> This batching architecture would support: >> 1. **Stateful chat sessions**: Batch multiple turns of a conversation >> with >> maintained history per session key >> 2. **Embedding generation**: Some providers (OpenAI) do have batch >> embedding APIs >> 3. **Multi-modal batching**: Batch image + text processing with >> structured outputs >> >> ## Questions for the Community >> >> 1. **Architecture**: Should this extend ML_PREDICT or be a new function? >> (I propose extending ML_PREDICT for backward compatibility) >> >> 2. **FLIP Required?**: Does this enhancement warrant a FLIP? >> >> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or >> similar >> functionality? >> >> 4. **Prompt Template Engine**: Should we: >> - Build a custom template engine? >> - Use existing library (e.g., StringTemplate, Mustache)? >> - Keep it simple with String.format initially? >> >> 5. **Response Parsing**: Preferred approach: >> - JSONPath library (flexible but adds dependency) >> - Simple JSON parsing with field names >> - Pluggable parser interface for extensibility? >> >> 6. **Error Handling**: If parsing fails for entire batch: >> - Fail all records in batch? >> - Retry batch once more? >> - Fallback to individual calls (with circuit breaker)? >> - Make strategy configurable? >> >> 7. **Batch Assembly**: Should batching happen: >> - Per parallel instance (each task maintains its own batch)? >> - Globally coordinated (shuffle to batch coordinator)? >> - I propose per-instance for simplicity and lower latency >> >> 8. **Compatibility**: Default batch.size=1 to maintain current behavior, >> users >> opt-in to batching? >> >> ## Why This Matters >> >> LLM inference is becoming a critical part of real-time data pipelines. >> Without >> batching: >> - Users face prohibitive costs for high-throughput workloads >> - Rate limits block production deployments >> - Latency makes real-time processing impractical >> >> While LLM providers don't offer native batch APIs, application-level >> batching >> through prompt engineering is a proven pattern used in production by many >> organizations. This proposal brings that capability natively into Flink. >> >> The hybrid configuration approach provides: >> - **Sensible defaults** for common use cases (sentiment analysis, >> classification) >> - **Flexibility** to customize prompts and parsing for specific needs >> - **Easy migration** for existing queries (batch.size=1 default) >> >> ## Next Steps >> >> If there's interest from the community, I'm happy to: >> 1. Prepare a detailed design document with prompt templates and parsing >> examples >> 2. Create a JIRA ticket >> 3. Develop a prototype demonstrating the batching and parsing logic >> 4. Write a FLIP if required >> >> Looking forward to your feedback and guidance on how best to proceed!-- >> Thanks And Regards >> Rahul >> > > > -- > Thanks And Regards > Rahul > -- Thanks And Regards Rahul
