Hi Hao
For option 2 the guarantees like exactly once and other things are not
guaranteed as it’s not in the same flink process

The flink process finishes after the submission and getting batchids which
it sends downstream

There has to be another process(doesn’t have to be flink)  which takes
these batchids and polls the OpenAI endpoint for status completed.

Once it gets completed it downloads the results and sends downstream

This secondary process is on client discretion , for Kafka probably a http
sink connector or Kafka consumer

Thanks And Regards
Rahul


On Mon, Nov 10, 2025 at 6:45 PM Hao Li <[email protected]> wrote:

> Hi Rahul,
>
> Thanks for the proposal. From some offline discussion, the endpoint you
> have in mind to support is OpenAI batch API [1]. The doc states that "Each
> batch completes within 24 hours (and often more quickly)". With this
> context, I have some questions:
>
> 1. For design option 1, does the operator always wait for batch response
> until processing next batch? This can take 24 hours which isn't feasible
> for streaming job I think.
>
> 2. For design option 2, why it loses exact-once and have higher latency
> compared to 1?
>
> 3. Also for the public interface section, are the parameters in
> `ml_predict` config or in options when `create model`?
>
> Thanks,
> Hao
>
>
> [1] https://platform.openai.com/docs/guides/batch/batch-api
>
> On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera <[email protected]>
> wrote:
>
> > +1
> >
> > This proposal is needed for optimizing network calls and processing asyc.
> >
> > Thanks
> > Asimansu
> >
> > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <[email protected]> wrote:
> >
> > > Hi, Rahul.
> > >
> > > +1 for this proposal. However, due to my current workload, I'll need
> > until
> > > the end of this week to review it thoroughly.
> > >
> > > Best,
> > > Shengkai
> > >
> > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日 13:08写道:
> > >
> > > > Hi ,
> > > > i have created a draft FLIP for it
> > > >
> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> > > >
> > > > Please let me know your thoughts
> > > >
> > > > regards
> > > > Rahul
> > > >
> > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <
> [email protected]
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > I actually thought of reworking my previous response. I want the
> > table
> > > > api
> > > > > to create jsonl files and call openai/claude batch apis.
> > > > > The implementation I am doing is going to batch the records into a
> > file
> > > > > and call the api with the file and then continuously poll the
> repose
> > to
> > > > see
> > > > > the status of the batch and then use that to write the response
> > > records.
> > > > > The ML_Predict in its current form is not usable as people are not
> > > > looking
> > > > > for synchronous response which is twice as expensive as the
> > > asynchronous
> > > > > response.
> > > > > let me know you thoughts and i can create a FLIP for it
> > > > > regards
> > > > >
> > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <
> > [email protected]
> > > >
> > > > > wrote:
> > > > >
> > > > >> Hi Flink Community,
> > > > >>
> > > > >> I'm interested in contributing an enhancement to Apache Flink's
> > > > >> ML_PREDICT
> > > > >> functionality for LLM interactions. I'd like to gauge community
> > > interest
> > > > >> and get
> > > > >> early feedback before proceeding with detailed design or a FLIP.
> > > > >>
> > > > >> ## Problem Statement
> > > > >>
> > > > >> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints,
> > each
> > > > >> record
> > > > >> triggers an individual API call. For a stream processing 1000
> > > > >> records/second,
> > > > >> this results in:
> > > > >>
> > > > >> - **1000 separate API calls per second**
> > > > >> - **High latency**: Each call has network overhead + API
> processing
> > > time
> > > > >> - **High cost**: Most LLM providers charge per token, and lack of
> > > > >> batching means
> > > > >>   no cost optimization
> > > > >> - **Rate limiting issues**: Hitting provider rate limits quickly
> > > > >> - **Poor throughput**: API calls are serialized per record
> > > > >>
> > > > >> ### Current Behavior (Inefficient)
> > > > >> ```sql
> > > > >> -- This makes 10 individual API calls
> > > > >> SELECT id, ML_PREDICT('llm_model', text) as result
> > > > >> FROM (VALUES
> > > > >>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
> > > > >> ) AS t(id, text);
> > > > >> ```
> > > > >> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead
> > > > >>
> > > > >> ## Proposed Solution: Application-Level Batching with Prompt
> > > Engineering
> > > > >>
> > > > >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide
> > > > native
> > > > >> batch
> > > > >> endpoints, we propose implementing batching at the application
> level
> > > by:
> > > > >>
> > > > >> 1. **Accumulating N records** into a single batch
> > > > >> 2. **Injecting records into a structured prompt** that instructs
> the
> > > LLM
> > > > >> to
> > > > >>    process multiple items
> > > > >> 3. **Parsing structured responses** to extract results for each
> > record
> > > > >> 4. **Emitting individual results** back to the Flink pipeline
> > > > >>
> > > > >> ### How It Works
> > > > >>
> > > > >> **Step 1: Batch Accumulation**
> > > > >> Collect up to `batch.size` records or wait up to `
> batch.timeout.ms`
> > > > >>
> > > > >> **Step 2: Prompt Construction**
> > > > >>
> > > > >> System: You are a sentiment analyzer. Process each item and
> respond
> > > with
> > > > >> JSON.
> > > > >>
> > > > >> User: Analyze the sentiment of these texts. Return a JSON array
> with
> > > one
> > > > >> object per input containing "index" and "sentiment" fields.
> > > > >>
> > > > >> Input 1: "This product is amazing!" Input 2: "Terrible experience,
> > > very
> > > > >> disappointed" Input 3: "It's okay, nothing special" ... Input 10:
> > > "Best
> > > > >> purchase ever!"
> > > > >>
> > > > >> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2,
> > > > >> "sentiment": "..."}, ...]
> > > > >>
> > > > >> **Step 3: Response Parsing**
> > > > >> ```json
> > > > >> [
> > > > >>   {"index": 1, "sentiment": "positive"},
> > > > >>   {"index": 2, "sentiment": "negative"},
> > > > >>   {"index": 3, "sentiment": "neutral"},
> > > > >>   ...
> > > > >>   {"index": 10, "sentiment": "positive"}
> > > > >> ]
> > > > >> ```
> > > > >>
> > > > >> **Step 4: Result Distribution**
> > > > >> Parse JSON and emit individual results back to corresponding
> records
> > > > >>
> > > > >> ### Model Configuration (Defaults)
> > > > >> ```sql
> > > > >> CREATE MODEL llm_sentiment WITH (
> > > > >>     'provider' = 'openai',
> > > > >>     'model' = 'gpt-4',
> > > > >>     'api_key' = '${API_KEY}',
> > > > >>     'batch.size' = '20',
> > > > >>     'batch.timeout.ms' = '1000',
> > > > >>     'system.prompt' = 'You are a sentiment analyzer. Always
> respond
> > > with
> > > > >> valid JSON.',
> > > > >>     'batch.prompt.template' = 'Analyze sentiment for these texts.
> > > Return
> > > > >> JSON array: [{"index": <n>, "sentiment":
> > > > "<positive|negative|neutral>"}]',
> > > > >>     'response.format' = 'json',
> > > > >>     'response.path' = '$[*]',  -- JSONPath to extract array of
> > results
> > > > >>     'response.index.field' = 'index',  -- Field containing record
> > > index
> > > > >>     'response.value.field' = 'sentiment'  -- Field containing
> result
> > > > >> );
> > > > >> ```
> > > > >>
> > > > >> ### Query Usage (Use Defaults)
> > > > >> ```sql
> > > > >> -- Uses batch_size=20 from model definition
> > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
> > > > >> FROM customer_reviews;
> > > > >> ```
> > > > >>
> > > > >> ### Query Usage (Override for Custom Analysis)
> > > > >> ```sql
> > > > >> -- Override prompt and batch size for different use case
> > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text,
> > > > >>     MAP['batch.size', '50',
> > > > >>         'batch.prompt.template', 'Extract key entities. Return
> JSON:
> > > > >> [{"index": <n>, "entities": [...]}]',
> > > > >>         'response.value.field', 'entities']) as entities
> > > > >> FROM documents;
> > > > >> ```
> > > > >>
> > > > >> ## Performance and Cost Impact
> > > > >>
> > > > >> ### Example: Processing 10,000 customer reviews
> > > > >>
> > > > >> **Current (unbatched)**:
> > > > >> - 10,000 API calls
> > > > >> - ~10,000 x 200ms latency = 2,000 seconds total processing time
> > > > >> (serialized)
> > > > >> - ~10,000 x $0.002 = $20 in API costs
> > > > >> - High rate limit pressure
> > > > >>
> > > > >> **With batching (batch_size=20)**:
> > > > >> - 500 API calls (10,000 / 20)
> > > > >> - ~500 x 300ms latency = 150 seconds total processing time
> > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to
> > > > larger
> > > > >> prompts,
> > > > >>   but still 85% cheaper overall)
> > > > >> - **20x fewer API calls**
> > > > >> - **13x faster processing**
> > > > >> - **85% cost reduction**
> > > > >>
> > > > >> ## Proposed Implementation
> > > > >>
> > > > >> ### Configuration Parameters
> > > > >>
> > > > >> **Model-level (defaults)**:
> > > > >> - `batch.size`: Maximum records per batch (default: 1 for backward
> > > > >> compatibility)
> > > > >> - `batch.timeout.ms`: Max time to wait before flushing incomplete
> > > batch
> > > > >> (default: 1000ms)
> > > > >> - `system.prompt`: System-level instruction for the LLM
> > > > >> - `batch.prompt.template`: Template explaining how to process
> > batched
> > > > >> inputs
> > > > >> - `response.format`: Expected response format ('json', 'xml',
> > > > 'delimited')
> > > > >> - `response.path`: JSONPath or XPath to extract results array
> > > > >> - `response.index.field`: Field name containing the record index
> > > > >> - `response.value.field`: Field name containing the actual result
> > > > >> - `max.retries`: Retry attempts for failed batches (default: 3)
> > > > >> - `request.timeout.ms`: Timeout for API calls (default: 30000ms)
> > > > >>
> > > > >> **Query-level (overrides)**:
> > > > >> - Any of the above can be overridden via MAP parameter in
> ML_PREDICT
> > > > >> - Per-query customization for different analysis tasks
> > > > >>
> > > > >> ### Key Features
> > > > >> 1. **Prompt injection**: Automatically construct batch prompts
> with
> > > > >> indexed inputs
> > > > >> 2. **Structured response parsing**: Support JSON, XML, or
> delimited
> > > > >> formats
> > > > >> 3. **Index tracking**: Maintain record-to-result mapping through
> the
> > > > batch
> > > > >> 4. **Error handling**: Handle parsing failures, missing indices,
> > > > >> malformed responses
> > > > >> 5. **Fallback to individual calls**: If batch fails, optionally
> > retry
> > > > >> records individually
> > > > >> 6. **Provider-agnostic**: Works with any LLM API (OpenAI,
> Anthropic,
> > > > >> Azure, self-hosted)
> > > > >> 7. **Async processing**: Non-blocking batch requests
> > > > >> 8. **Back-pressure**: Proper flow control when API is slow
> > > > >> 9. **Backward compatible**: batch.size=1 maintains current
> behavior
> > > > >>
> > > > >> ### Technical Approach
> > > > >> - Extend existing ML_PREDICT infrastructure
> > > > >> - Add batching buffer in the ML_PREDICT operator
> > > > >> - Implement prompt template engine for batch construction:
> > > > >>   - Inject record index + content into template
> > > > >>   - Support various templating formats (JSON, XML, plain text)
> > > > >> - Implement response parser:
> > > > >>   - Extract structured data (JSONPath, XPath, regex)
> > > > >>   - Map results back to original records by index
> > > > >>   - Handle missing or malformed responses
> > > > >> - Maintain record ordering and error attribution
> > > > >> - Support parameter override mechanism in ML_PREDICT function
> > > signature
> > > > >>
> > > > >> ### Response Parsing Strategy
> > > > >>
> > > > >> The implementation must handle:
> > > > >> 1. **Successful batch response**: Parse and distribute results
> > > > >> 2. **Partial failure**: Some records missing from response → emit
> > > errors
> > > > >> for those
> > > > >> 3. **Complete parse failure**: Optionally fallback to individual
> > calls
> > > > >> 4. **Index mismatch**: Response indices don't match input → log
> > > warning
> > > > >> and best-effort match
> > > > >> 5. **Malformed JSON**: Retry with error handling
> > > > >>
> > > > >> Example error handling:
> > > > >> ```sql
> > > > >> -- Records that fail parsing get null results with error metadata
> > > > >> SELECT
> > > > >>     id,
> > > > >>     text,
> > > > >>     result.value as sentiment,
> > > > >>     result.error as error_msg
> > > > >> FROM source_table,
> > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> > > > >> ```
> > > > >>
> > > > >> ## Limitations and Considerations
> > > > >>
> > > > >> 1. **LLM instruction following**: Depends on model's ability to
> > follow
> > > > >> structured
> > > > >>    output instructions. GPT-4 and Claude are reliable; older
> models
> > > may
> > > > >> struggle.
> > > > >>
> > > > >> 2. **Prompt size limits**: Batching too many records may exceed
> > > context
> > > > >> windows
> > > > >>    - GPT-4: ~8K tokens input limit
> > > > >>    - Claude: ~200K tokens but practical batches smaller
> > > > >>    - Need configurable max batch size based on average record
> length
> > > > >>
> > > > >> 3. **Token cost trade-off**: Larger batches mean:
> > > > >>    - Fewer API calls (good)
> > > > >>    - But larger prompts with instructions/formatting (slight
> > overhead)
> > > > >>    - Net savings still 80-90% in practice
> > > > >>
> > > > >> 4. **Parsing reliability**: Small risk of malformed responses
> > > > >>    - Mitigated by: clear instructions, JSON mode (GPT-4), retry
> > logic
> > > > >>    - Fallback to individual calls if batch parsing fails
> repeatedly
> > > > >>
> > > > >> 5. **Latency characteristics**:
> > > > >>    - Individual records see slightly higher latency (waiting for
> > > batch)
> > > > >>    - Overall throughput dramatically improved
> > > > >>    - Use `batch.timeout.ms` to balance latency vs throughput
> > > > >>
> > > > >> ## Future Extensions
> > > > >>
> > > > >> This batching architecture would support:
> > > > >> 1. **Stateful chat sessions**: Batch multiple turns of a
> > conversation
> > > > >> with
> > > > >>    maintained history per session key
> > > > >> 2. **Embedding generation**: Some providers (OpenAI) do have batch
> > > > >> embedding APIs
> > > > >> 3. **Multi-modal batching**: Batch image + text processing with
> > > > >> structured outputs
> > > > >>
> > > > >> ## Questions for the Community
> > > > >>
> > > > >> 1. **Architecture**: Should this extend ML_PREDICT or be a new
> > > function?
> > > > >>    (I propose extending ML_PREDICT for backward compatibility)
> > > > >>
> > > > >> 2. **FLIP Required?**: Does this enhancement warrant a FLIP?
> > > > >>
> > > > >> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT
> > or
> > > > >> similar
> > > > >>    functionality?
> > > > >>
> > > > >> 4. **Prompt Template Engine**: Should we:
> > > > >>    - Build a custom template engine?
> > > > >>    - Use existing library (e.g., StringTemplate, Mustache)?
> > > > >>    - Keep it simple with String.format initially?
> > > > >>
> > > > >> 5. **Response Parsing**: Preferred approach:
> > > > >>    - JSONPath library (flexible but adds dependency)
> > > > >>    - Simple JSON parsing with field names
> > > > >>    - Pluggable parser interface for extensibility?
> > > > >>
> > > > >> 6. **Error Handling**: If parsing fails for entire batch:
> > > > >>    - Fail all records in batch?
> > > > >>    - Retry batch once more?
> > > > >>    - Fallback to individual calls (with circuit breaker)?
> > > > >>    - Make strategy configurable?
> > > > >>
> > > > >> 7. **Batch Assembly**: Should batching happen:
> > > > >>    - Per parallel instance (each task maintains its own batch)?
> > > > >>    - Globally coordinated (shuffle to batch coordinator)?
> > > > >>    - I propose per-instance for simplicity and lower latency
> > > > >>
> > > > >> 8. **Compatibility**: Default batch.size=1 to maintain current
> > > behavior,
> > > > >> users
> > > > >>    opt-in to batching?
> > > > >>
> > > > >> ## Why This Matters
> > > > >>
> > > > >> LLM inference is becoming a critical part of real-time data
> > pipelines.
> > > > >> Without
> > > > >> batching:
> > > > >> - Users face prohibitive costs for high-throughput workloads
> > > > >> - Rate limits block production deployments
> > > > >> - Latency makes real-time processing impractical
> > > > >>
> > > > >> While LLM providers don't offer native batch APIs,
> application-level
> > > > >> batching
> > > > >> through prompt engineering is a proven pattern used in production
> by
> > > > many
> > > > >> organizations. This proposal brings that capability natively into
> > > Flink.
> > > > >>
> > > > >> The hybrid configuration approach provides:
> > > > >> - **Sensible defaults** for common use cases (sentiment analysis,
> > > > >> classification)
> > > > >> - **Flexibility** to customize prompts and parsing for specific
> > needs
> > > > >> - **Easy migration** for existing queries (batch.size=1 default)
> > > > >>
> > > > >> ## Next Steps
> > > > >>
> > > > >> If there's interest from the community, I'm happy to:
> > > > >> 1. Prepare a detailed design document with prompt templates and
> > > parsing
> > > > >> examples
> > > > >> 2. Create a JIRA ticket
> > > > >> 3. Develop a prototype demonstrating the batching and parsing
> logic
> > > > >> 4. Write a FLIP if required
> > > > >>
> > > > >> Looking forward to your feedback and guidance on how best to
> > > proceed!--
> > > > >> Thanks And Regards
> > > > >> Rahul
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Thanks And Regards
> > > > > Rahul
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks And Regards
> > > > Rahul
> > > >
> > >
> >
>

Reply via email to