Hi,
I actually thought of reworking my previous response. I want the table api
to create jsonl files and call openai/claude batch apis.
The implementation I am doing is going to batch the records into a file and
call the api with the file and then continuously poll the repose to see the
status of the batch and then use that to write the response records.
The ML_Predict in its current form is not usable as people are not looking
for synchronous response which is twice as expensive as the asynchronous
response.
let me know you thoughts and i can create a FLIP for it
regards

On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]>
wrote:

> Hi Flink Community,
>
> I'm interested in contributing an enhancement to Apache Flink's ML_PREDICT
> functionality for LLM interactions. I'd like to gauge community interest
> and get
> early feedback before proceeding with detailed design or a FLIP.
>
> ## Problem Statement
>
> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each
> record
> triggers an individual API call. For a stream processing 1000
> records/second,
> this results in:
>
> - **1000 separate API calls per second**
> - **High latency**: Each call has network overhead + API processing time
> - **High cost**: Most LLM providers charge per token, and lack of batching
> means
>   no cost optimization
> - **Rate limiting issues**: Hitting provider rate limits quickly
> - **Poor throughput**: API calls are serialized per record
>
> ### Current Behavior (Inefficient)
> ```sql
> -- This makes 10 individual API calls
> SELECT id, ML_PREDICT('llm_model', text) as result
> FROM (VALUES
>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
> ) AS t(id, text);
> ```
> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead
>
> ## Proposed Solution: Application-Level Batching with Prompt Engineering
>
> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native
> batch
> endpoints, we propose implementing batching at the application level by:
>
> 1. **Accumulating N records** into a single batch
> 2. **Injecting records into a structured prompt** that instructs the LLM
> to
>    process multiple items
> 3. **Parsing structured responses** to extract results for each record
> 4. **Emitting individual results** back to the Flink pipeline
>
> ### How It Works
>
> **Step 1: Batch Accumulation**
> Collect up to `batch.size` records or wait up to `batch.timeout.ms`
>
> **Step 2: Prompt Construction**
>
> System: You are a sentiment analyzer. Process each item and respond with
> JSON.
>
> User: Analyze the sentiment of these texts. Return a JSON array with one
> object per input containing "index" and "sentiment" fields.
>
> Input 1: "This product is amazing!" Input 2: "Terrible experience, very
> disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best
> purchase ever!"
>
> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2, "sentiment":
> "..."}, ...]
>
> **Step 3: Response Parsing**
> ```json
> [
>   {"index": 1, "sentiment": "positive"},
>   {"index": 2, "sentiment": "negative"},
>   {"index": 3, "sentiment": "neutral"},
>   ...
>   {"index": 10, "sentiment": "positive"}
> ]
> ```
>
> **Step 4: Result Distribution**
> Parse JSON and emit individual results back to corresponding records
>
> ### Model Configuration (Defaults)
> ```sql
> CREATE MODEL llm_sentiment WITH (
>     'provider' = 'openai',
>     'model' = 'gpt-4',
>     'api_key' = '${API_KEY}',
>     'batch.size' = '20',
>     'batch.timeout.ms' = '1000',
>     'system.prompt' = 'You are a sentiment analyzer. Always respond with
> valid JSON.',
>     'batch.prompt.template' = 'Analyze sentiment for these texts. Return
> JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]',
>     'response.format' = 'json',
>     'response.path' = '$[*]',  -- JSONPath to extract array of results
>     'response.index.field' = 'index',  -- Field containing record index
>     'response.value.field' = 'sentiment'  -- Field containing result
> );
> ```
>
> ### Query Usage (Use Defaults)
> ```sql
> -- Uses batch_size=20 from model definition
> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
> FROM customer_reviews;
> ```
>
> ### Query Usage (Override for Custom Analysis)
> ```sql
> -- Override prompt and batch size for different use case
> SELECT id, text, ML_PREDICT('llm_sentiment', text,
>     MAP['batch.size', '50',
>         'batch.prompt.template', 'Extract key entities. Return JSON:
> [{"index": <n>, "entities": [...]}]',
>         'response.value.field', 'entities']) as entities
> FROM documents;
> ```
>
> ## Performance and Cost Impact
>
> ### Example: Processing 10,000 customer reviews
>
> **Current (unbatched)**:
> - 10,000 API calls
> - ~10,000 x 200ms latency = 2,000 seconds total processing time
> (serialized)
> - ~10,000 x $0.002 = $20 in API costs
> - High rate limit pressure
>
> **With batching (batch_size=20)**:
> - 500 API calls (10,000 / 20)
> - ~500 x 300ms latency = 150 seconds total processing time
> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger
> prompts,
>   but still 85% cheaper overall)
> - **20x fewer API calls**
> - **13x faster processing**
> - **85% cost reduction**
>
> ## Proposed Implementation
>
> ### Configuration Parameters
>
> **Model-level (defaults)**:
> - `batch.size`: Maximum records per batch (default: 1 for backward
> compatibility)
> - `batch.timeout.ms`: Max time to wait before flushing incomplete batch
> (default: 1000ms)
> - `system.prompt`: System-level instruction for the LLM
> - `batch.prompt.template`: Template explaining how to process batched
> inputs
> - `response.format`: Expected response format ('json', 'xml', 'delimited')
> - `response.path`: JSONPath or XPath to extract results array
> - `response.index.field`: Field name containing the record index
> - `response.value.field`: Field name containing the actual result
> - `max.retries`: Retry attempts for failed batches (default: 3)
> - `request.timeout.ms`: Timeout for API calls (default: 30000ms)
>
> **Query-level (overrides)**:
> - Any of the above can be overridden via MAP parameter in ML_PREDICT
> - Per-query customization for different analysis tasks
>
> ### Key Features
> 1. **Prompt injection**: Automatically construct batch prompts with
> indexed inputs
> 2. **Structured response parsing**: Support JSON, XML, or delimited formats
> 3. **Index tracking**: Maintain record-to-result mapping through the batch
> 4. **Error handling**: Handle parsing failures, missing indices, malformed
> responses
> 5. **Fallback to individual calls**: If batch fails, optionally retry
> records individually
> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic,
> Azure, self-hosted)
> 7. **Async processing**: Non-blocking batch requests
> 8. **Back-pressure**: Proper flow control when API is slow
> 9. **Backward compatible**: batch.size=1 maintains current behavior
>
> ### Technical Approach
> - Extend existing ML_PREDICT infrastructure
> - Add batching buffer in the ML_PREDICT operator
> - Implement prompt template engine for batch construction:
>   - Inject record index + content into template
>   - Support various templating formats (JSON, XML, plain text)
> - Implement response parser:
>   - Extract structured data (JSONPath, XPath, regex)
>   - Map results back to original records by index
>   - Handle missing or malformed responses
> - Maintain record ordering and error attribution
> - Support parameter override mechanism in ML_PREDICT function signature
>
> ### Response Parsing Strategy
>
> The implementation must handle:
> 1. **Successful batch response**: Parse and distribute results
> 2. **Partial failure**: Some records missing from response → emit errors
> for those
> 3. **Complete parse failure**: Optionally fallback to individual calls
> 4. **Index mismatch**: Response indices don't match input → log warning
> and best-effort match
> 5. **Malformed JSON**: Retry with error handling
>
> Example error handling:
> ```sql
> -- Records that fail parsing get null results with error metadata
> SELECT
>     id,
>     text,
>     result.value as sentiment,
>     result.error as error_msg
> FROM source_table,
> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> ```
>
> ## Limitations and Considerations
>
> 1. **LLM instruction following**: Depends on model's ability to follow
> structured
>    output instructions. GPT-4 and Claude are reliable; older models may
> struggle.
>
> 2. **Prompt size limits**: Batching too many records may exceed context
> windows
>    - GPT-4: ~8K tokens input limit
>    - Claude: ~200K tokens but practical batches smaller
>    - Need configurable max batch size based on average record length
>
> 3. **Token cost trade-off**: Larger batches mean:
>    - Fewer API calls (good)
>    - But larger prompts with instructions/formatting (slight overhead)
>    - Net savings still 80-90% in practice
>
> 4. **Parsing reliability**: Small risk of malformed responses
>    - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic
>    - Fallback to individual calls if batch parsing fails repeatedly
>
> 5. **Latency characteristics**:
>    - Individual records see slightly higher latency (waiting for batch)
>    - Overall throughput dramatically improved
>    - Use `batch.timeout.ms` to balance latency vs throughput
>
> ## Future Extensions
>
> This batching architecture would support:
> 1. **Stateful chat sessions**: Batch multiple turns of a conversation with
>    maintained history per session key
> 2. **Embedding generation**: Some providers (OpenAI) do have batch
> embedding APIs
> 3. **Multi-modal batching**: Batch image + text processing with structured
> outputs
>
> ## Questions for the Community
>
> 1. **Architecture**: Should this extend ML_PREDICT or be a new function?
>    (I propose extending ML_PREDICT for backward compatibility)
>
> 2. **FLIP Required?**: Does this enhancement warrant a FLIP?
>
> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or
> similar
>    functionality?
>
> 4. **Prompt Template Engine**: Should we:
>    - Build a custom template engine?
>    - Use existing library (e.g., StringTemplate, Mustache)?
>    - Keep it simple with String.format initially?
>
> 5. **Response Parsing**: Preferred approach:
>    - JSONPath library (flexible but adds dependency)
>    - Simple JSON parsing with field names
>    - Pluggable parser interface for extensibility?
>
> 6. **Error Handling**: If parsing fails for entire batch:
>    - Fail all records in batch?
>    - Retry batch once more?
>    - Fallback to individual calls (with circuit breaker)?
>    - Make strategy configurable?
>
> 7. **Batch Assembly**: Should batching happen:
>    - Per parallel instance (each task maintains its own batch)?
>    - Globally coordinated (shuffle to batch coordinator)?
>    - I propose per-instance for simplicity and lower latency
>
> 8. **Compatibility**: Default batch.size=1 to maintain current behavior,
> users
>    opt-in to batching?
>
> ## Why This Matters
>
> LLM inference is becoming a critical part of real-time data pipelines.
> Without
> batching:
> - Users face prohibitive costs for high-throughput workloads
> - Rate limits block production deployments
> - Latency makes real-time processing impractical
>
> While LLM providers don't offer native batch APIs, application-level
> batching
> through prompt engineering is a proven pattern used in production by many
> organizations. This proposal brings that capability natively into Flink.
>
> The hybrid configuration approach provides:
> - **Sensible defaults** for common use cases (sentiment analysis,
> classification)
> - **Flexibility** to customize prompts and parsing for specific needs
> - **Easy migration** for existing queries (batch.size=1 default)
>
> ## Next Steps
>
> If there's interest from the community, I'm happy to:
> 1. Prepare a detailed design document with prompt templates and parsing
> examples
> 2. Create a JIRA ticket
> 3. Develop a prototype demonstrating the batching and parsing logic
> 4. Write a FLIP if required
>
> Looking forward to your feedback and guidance on how best to proceed!--
> Thanks And Regards
> Rahul
>


-- 
Thanks And Regards
Rahul

Reply via email to