Hi Matt, Good point on read_generic/write_generic over read_custom()/write_custom(). I have updated the FLIP to reflect this.
Regards, Dian On Thu, Jul 2, 2026 at 9:06 PM Dian Fu <[email protected]> wrote: > > Hi Cole, > > Thanks for the detailed feedback. I went through the points and they > all make sense. > > I have updated the FLIP for the following places: > > - Updated OVER window with the flatten API, it does make more sense, good > point! > - Replaced window with 4 APIs: tumble/hop/cumulate/session. It would > be great if you could take a further look at it. > - Replaced `filter(predicate, **constraints)` with > `filter(*predicates, **constraints)` > - Added `iter_rows`, `iter_batches`, and `take` in the FLIP > > Regarding your concern on the `take` API, do you think it makes sense > to introduce some kind of API like `take_batch` (just like what's done > in Ray Data)? It may look like the following: > > ``` > def take_batch( > self, > n: int, > *, > timeout: int | None = None, > batch_format: Literal["pandas", "pyarrow"] = "pandas", > include_row_kind: bool = False, > row_kind_field: str = "__row_kind__", > ) -> pandas.DataFrame | pyarrow.Table > ``` > > Do you think it could address the notebook/IPython concern you had in mind? > > Best, > Dian > > On Thu, Jul 2, 2026 at 3:30 AM Matt Belle via dev <[email protected]> > wrote: > > > > Hi Dian, > > > > Thanks for putting together FLIP-591, really solid work. > > > > I wanted to suggest renaming `read_custom()` and `write_custom()` to > > `read_generic()` and `write_generic()`. > > > > As someone experienced with Python but new to Flink, my immediate > > assumption with these functions was that they were for implementing my own > > custom connectors. After reading through the discussion thread, I see now > > that they're actually escape hatches for accessing existing Flink > > connectors that don't have dedicated DataFrame helpers yet (Elasticsearch, > > Cassandra, etc). > > > > I think the semantic difference matters here: "generic" implies working > > with an existing definition (any pre-defined connector), while "custom" > > implies any definition including user-defined ones. Switching to "generic" > > would make it more clear that this is for accessing existing connectors we > > haven't wrapped yet, not for implementing new ones. > > > > ```python > > df = pf.read_generic( > > connector="elasticsearch", > > hosts="localhost:9200", > > index="events" > > ) > > ``` > > > > Just a naming change, no functional impact, but I think it would save > > confusion for other folks coming from the larger Python ecosystem. > > > > Thanks, > > Matt > > > > On 2026/06/18 15:26:08 Dian Fu wrote: > > > Hi FangYong, > > > > > > Thanks for the comments. These are very good points, especially for > > > multimodal / AI-oriented workloads. > > > > > > 1. For multimodal read/write APIs, the current intent is to keep this > > > FLIP focused on the general DataFrame API surface, while using > > > read_custom / write_custom as the escape hatch for connectors or > > > formats that do not yet have dedicated helpers. For multimodal data > > > specifically, I totally agree with you that more explicit APIs should > > > be introduced, e.g. read_video_frames, etc. You may have noticed that > > > currently multimodal data-types are also not included in this FLIP. > > > I'd like to make it a separate FLIP for multimodal data support (once > > > multimodal data-types support is ready which are under-discussion in > > > FLIP-589 and FLIP-590). > > > > > > 2. For map_batches, I agree with your point. For the initial FLIP, we > > > currently keep batch_size for simplicity. I think introducing > > > memory-budget-based batching is a good direction which may be worth a > > > separate FLIP to make sure it's well designed. > > > > > > 3. GPU resource configuration is also a very good point: It depends on > > > some work from Yi Zhang. However, I believe we will introduce GPU > > > support in Python DataFrame API soon. > > > > > > 4. For class-based processors in map / map_batches: It's planned to be > > > supported. I have added examples in the corresponding sections. > > > > > > Regards, > > > Dian > > > > > > On Thu, Jun 18, 2026 at 10:25 PM Dian Fu <[email protected]> wrote: > > > > > > > > Hi Zander, > > > > > > > > Thanks for the feedback! > > > > > > > > Replies inline. > > > > > > > > Relationship to FLIP-541: Good point. FLIP-541 took the approach of > > > > evolving the Table API itself toward a more DataFrame-like style. In > > > > practice, though, I found that it turned out to be hard to push > > > > forward: the Table API already has a large API surface, and for > > > > compatibility reasons the DataFrame-style additions would have to > > > > coexist with the existing Table-style APIs. Mixing the two styles in > > > > one API tends to make things more confusing for users rather than > > > > less. So I think a more viable direction may be to keep the Table API > > > > positioned as the API aligned with Flink's existing relational/Table > > > > model and introduce a separate, dedicated DataFrame API alongside it. > > > > That keeps each API internally consistent — Table API for the > > > > Flink-native relational model, DataFrame API for the Python/DataFrame > > > > mental model — instead of blending both into one surface. > > > > > > > > API parity: My overall take is that we should adopt a DataFrame-style > > > > API design rather than 100% following Pandas DataFrame API. This is > > > > also the direction taken by the newer generation of Python data > > > > projects such as Polars, Daft, and Ray Data: they share the familiar > > > > DataFrame mental model and ergonomics, but each adapts the API to its > > > > own execution engine and semantics rather than mirroring pandas > > > > exactly. We follow the same philosophy here — borrow the > > > > widely-adopted ergonomics, but stay consistent with Flink's > > > > streaming/distributed semantics. > > > > > > > > On the specific points: > > > > 1. dropna/fillna vs drop_null/fill_null: I'd lean toward keeping > > > > drop_null/fill_null. Flink's type system has a real NULL (and NaN is a > > > > distinct float value) and actually Python has None, so the FLIP > > > > separates them: drop_null/fill_null for NULL and drop_nan/fill_nan for > > > > NaN. The pandas dropna name conflates the two. Polars made the same > > > > NULL/NaN split with drop_nulls/fill_null + fill_nan, so this also has > > > > precedent. > > > > 2. show() / display(): Good point. Peeking at data is essential. Have > > > > added show, __repr__, _repr_html_ and _repr_mimebundle_ in the section > > > > "DataFrame — Conversion & Execution". > > > > 3. sort(by, descending=False): I kept descending= rather than > > > > ascending=. The default is ascending in all cases, matching > > > > pandas/PySpark/Polars/Daft behavior — the only difference is the flag > > > > name, and the flag name descending aligns with Polars/Daft/Ray. While > > > > Pyspark/Pandas takes ascending as the flag name. For me, I slightly > > > > tend to align with Polars/Daft/Ray since our motivation is multimodal > > > > data processing and our users will be familiar with these projects. > > > > However, I'm open to this. > > > > 4. map() element-wise vs row-wise: You're right that pandas > > > > DataFrame.map is element-wise (it's the renamed applymap), while ours > > > > is row-wise. We intentionally follow the Ray Data model here (map = > > > > per-row, map_batches = vectorized), which fits the enrichment/ML use > > > > cases. The name `apply` isn't paired with map_batches and so I tend to > > > > use `map` here. > > > > 5. Index: Yes, this is deliberate — the API has no implicit > > > > pandas-style index. Flink is a distributed/streaming engine with no > > > > global row order, so a positional index isn't well-defined. Row/column > > > > selection is done by column reference and boolean filtering instead. > > > > 6. in_place: DataFrames here are lazy logical plans, not mutable > > > > in-memory buffers, so every operation returns a new DataFrame — > > > > there's nothing to mutate in place. > > > > 7. to_json/from_json and .values(): JSON is supported for > > > > sources/sinks via read_json/write_json. Is that what you want? > > > > 8. Properties (df.shape/df.info/df.describe): Have added `describe()` > > > > in section "13. DataFrame — Conversion & Execution". Regarding > > > > `df.shape and df.info`: I left out for now since it will force data > > > > processing for `df.shape` which seems not that useful and schema > > > > inspection is already available via df.schema. > > > > > > > > Execution model / low-level trigger: I like the escape-hatch idea. > > > > Today an execution can be triggered via > > > > to_pandas()/to_list()/collect()/show(). Could you elaborate on what > > > > specific API you have in mind? > > > > > > > > EDA / common use cases: The primary use cases I envision are ML/AI > > > > enrichment (this is to support FLIP-577: AI-Native Flink — An Umbrella > > > > Proposal for Multimodal Data Processing). EDA is supported > > > > (show/describe/to_pandas) but I'd keep the heavier EDA helpers minimal > > > > and grow them based on demand. > > > > > > > > Thanks again and looking forward to collaborating! > > > > > > > > Regards, > > > > Dian > > > > > > > > On Thu, Jun 18, 2026 at 8:50 PM Dian Fu <[email protected]> wrote: > > > > > > > > > > Hi Cole, > > > > > > > > > > Thanks for the feedback. Both suggestions make sense for me, and I've > > > > > updated the FLIP. > > > > > > > > > > 1. kwargs aliasing in .agg: see section "6. DataFrame — Aggregation" > > > > > 2. Attribute-style column references: see section "15. DataFrame — > > > > > Column Access". > > > > > > > > > > Thanks again! > > > > > > > > > > Regards, > > > > > Dian > > > > > > > > > > On Thu, Jun 18, 2026 at 11:06 AM Yong Fang <[email protected]> wrote: > > > > > > > > > > > > Thanks Fu Dian for initiating this discussion and +1 for the total > > > > > > design. > > > > > > > > > > > > I have several comments for this flip: > > > > > > > > > > > > 1. In multi-modal data processing scenarios, the operations are > > > > > > mostly > > > > > > reading and writing files, images, audio and video. From the > > > > > > current API > > > > > > perspective, these operations can only be added manually in > > > > > > read_custom and > > > > > > write_custom, how are the read/write APIs for these types designed? > > > > > > > > > > > > 2. Currently, batch_size is controlled by row count in map_batches. > > > > > > However, the per-row size of multimodal data varies dramatically — > > > > > > a single > > > > > > 4K image can be up to 20 MB, while a piece of text may only be 100 > > > > > > B. > > > > > > Splitting batches purely by row count may lead to OOM. Should we > > > > > > support > > > > > > splitting batches by memory budget too? > > > > > > > > > > > > 3. GPU computing is a common requirement in multimodal processing, > > > > > > I don't > > > > > > seem to see any related information in this set of APIs such as > > > > > > map/map_batches and ect. How can we set GPU/CPU resources and > > > > > > specifications for them and udfs? > > > > > > > > > > > > 4. In addition, users may need to load local models within > > > > > > map/map_batches > > > > > > for data processing. The current APIs only support the callback > > > > > > format. > > > > > > Should class types also be supported? This way, I/O and other > > > > > > operations > > > > > > only need to be executed once for a class instance. > > > > > > > > > > > > Best, > > > > > > FangYong > > > > > > > > > > > > On Thu, Jun 18, 2026 at 7:13 AM Zander Matheson <[email protected]> > > > > > > wrote: > > > > > > > > > > > > > Very nice proposal, Dian Fu, thank you for putting this together! > > > > > > > > > > > > > > It feels like it supersedes FLIP-541 > > > > > > > < > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=378473217 > > > > > > > > > > > > > > > since > > > > > > > it will solve many of the problems we initially discussed related > > > > > > > to making > > > > > > > Flink more Pythonic. > > > > > > > > > > > > > > I had some questions and comments on the flip shared below. > > > > > > > Overall I > > > > > > > really like the design you have proposed and look forward to > > > > > > > collaborating > > > > > > > hopefully! > > > > > > > > > > > > > > *Pandas/PySpark/Polars API Parity* > > > > > > > What are your thoughts on parity with some of the established > > > > > > > APIs. I see > > > > > > > it is mentioned in the FLIP to both lower the barrier to entry > > > > > > > for Python > > > > > > > users who are familiar with Dataframe APIs, while also having a > > > > > > > non-goal of > > > > > > > providing full compatibility with Pandas, Polars etc. Below are a > > > > > > > couple of > > > > > > > dataframe api common that we could potentially more closely align > > > > > > > to. > > > > > > > > > > > > > > 1. dropna and fillna instead of drop_null/fill_null. Python > > > > > > > does not > > > > > > > have a Null concept, so it might make sense to keep the > > > > > > > drop_na used in > > > > > > > other dataframe libraries? > > > > > > > 2. show(), .display(). How to peek at the data? This is common > > > > > > > patter in > > > > > > > development > > > > > > > 3. sort(by, descending=False). This boolean flag is opposite > > > > > > > what is > > > > > > > used in other libraries. > > > > > > > 4. map() in Dataframe land is an element-wise computation, do > > > > > > > we want to > > > > > > > break with that? I believe apply() is more similar. That being > > > > > > > said, > > > > > > > map as > > > > > > > intended from map reduce is more akin to the row-wise model > > > > > > > shown. More > > > > > > > a > > > > > > > question than anything else. > > > > > > > 5. Index - I didn’t see any mention to indices in the > > > > > > > document, is this > > > > > > > something we are explicitly avoiding? I have often use indices > > > > > > > for > > > > > > > selecting groups of rows or columns for manipulation. > > > > > > > 6. In_place - this may not be possible with the execution > > > > > > > style, but is > > > > > > > there a possibility of having in_place booleans for things > > > > > > > like unique > > > > > > > or > > > > > > > sort where the output can either be done in place or the > > > > > > > output needs > > > > > > > to be > > > > > > > assigned to a new dataframe. > > > > > > > 7. to_json/from_json and .values(). > > > > > > > 8. Properties - Do we want to add some of the niceties of > > > > > > > spark/pandas > > > > > > > here if possible? df.shape, df.info, df.describe > > > > > > > > > > > > > > > > > > > > > *Execution Model*I like the idea of having triggers for the > > > > > > > execution on > > > > > > > write and materialization (to_pandas etc.). This is probably the > > > > > > > meat of > > > > > > > the problem for creating a dataframe API that feels like the > > > > > > > other commonly > > > > > > > used dataframe APIs. The balance between write and providing the > > > > > > > statement_set to make the writes coordinated seems nice. But is > > > > > > > there also > > > > > > > potentially a world where we want to expose the ability to > > > > > > > arbitrarily > > > > > > > trigger an execution? Say for Exploratory Data Analysis in a > > > > > > > notebook? This > > > > > > > seems possible with to_pandas and to_list etc., but maybe having > > > > > > > a lower > > > > > > > level primitive that could be called could be a good escape hatch? > > > > > > > > > > > > > > > > > > > > > *Exploratory Data Analysis*One of the biggest use cases for > > > > > > > dataframes is > > > > > > > exploratory data analysis. Is this something we want to encourage > > > > > > > with this > > > > > > > FLIP? It might make sense to add some of the EDA methods for that > > > > > > > purpose. > > > > > > > See above, show/display and execution trigger. > > > > > > > > > > > > > > *Common Use Cases* > > > > > > > Related to the EDA note above, I am curious what you envision as > > > > > > > the most > > > > > > > common use cases for this API. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Zander > > > > > > > > > > > > > > On Wed, Jun 17, 2026 at 2:42 AM Cole Bailey via dev > > > > > > > <[email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks Dian, > > > > > > > > > > > > > > > > There is a lot of good work here that aligns with what we have > > > > > > > > been > > > > > > > > brainstorming for a better PyFlink experience. > > > > > > > > > > > > > > > > One ergonomic suggestion I'd love to see included is supporting > > > > > > > > pythonic > > > > > > > > aliasing via kwargs within `.agg` similar to what is already > > > > > > > > outlined in > > > > > > > > `with_columns` or `select`: > > > > > > > > > > > > > > > > The example would instead look like this: > > > > > > > > > > > > > > > > df.group_by("dept").agg( > > > > > > > > avg_salary=col("salary").avg(), > > > > > > > > headcount=col("id").count(), > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > Another nice-to-have would be flexibility in column > > > > > > > > referencing, I see 2 > > > > > > > > variations scattered throughout the FLIP: > > > > > > > > > > > > > > > > col("age") > > > > > > > > > > > > > > > > df["age"] > > > > > > > > > > > > > > > > > > > > > > > > Both of these make sense, I think we should also consider > > > > > > > > supporting attr > > > > > > > > style column references since these can be reused across the > > > > > > > > lambda or > > > > > > > > subscript filtering examples already in the FLIP: > > > > > > > > > > > > > > > > df.age > > > > > > > > > > > > > > > > > > > > > > > > That would then give us this representative example: > > > > > > > > > > > > > > > > df.group_by("dept").agg( > > > > > > > > avg_salary=df.salary.avg(), > > > > > > > > headcount=df.id.count(), > > > > > > > > ) > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Cole > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jun 17, 2026 at 6:12 AM Dian Fu <[email protected]> wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > I would like to start a discussion about FLIP-591: > > > > > > > > > Introducing Python > > > > > > > > > DataFrame API in PyFlink [1]. > > > > > > > > > > > > > > > > > > This FLIP is to a sub-FLIP of the broader direction discussed > > > > > > > > > in > > > > > > > > > FLIP-577 (AI-Native Flink — An Umbrella Proposal for > > > > > > > > > Multimodal Data > > > > > > > > > Processing) [2]. This FLIP proposes a new public Python > > > > > > > > > module, > > > > > > > > > `pyflink.dataframe`, as a DataFrame-style API on top of the > > > > > > > > > existing > > > > > > > > > PyFlink Table API. The goal is not to introduce a new > > > > > > > > > execution model, > > > > > > > > > but to provide a more natural Python-facing entry point for > > > > > > > > > users > > > > > > > > > coming from the broader Python data ecosystem, while > > > > > > > > > preserving Flink > > > > > > > > > semantics and execution capabilities. > > > > > > > > > > > > > > > > > > The proposal focuses on: > > > > > > > > > - Designing a Python-friendly DataFrame API for PyFlink, > > > > > > > > > including > > > > > > > > > the API shape itself, a more user-friendly DataType design, > > > > > > > > > unified > > > > > > > > > configuration, reduced TableEnvironment boilerplate, and a > > > > > > > > > practical > > > > > > > > > multiple-sink model for end-to-end pipelines > > > > > > > > > - Providing ergonomic support for row-oriented Python > > > > > > > > > transformations, including map / map_batches style operations > > > > > > > > > for > > > > > > > > > enrichment, feature engineering, and AI/ML workloads > > > > > > > > > - Exposing concurrency configuration so that expensive > > > > > > > > > Python > > > > > > > > > stages can be scaled independently, making it easier to build > > > > > > > > > practical jobs directly with the DataFrame API > > > > > > > > > - Supporting Arrow as a first-class batch format for > > > > > > > > > efficient > > > > > > > > > interoperability with the Python ecosystem > > > > > > > > > > > > > > > > > > The Design Decisions section discusses the main design > > > > > > > > > considerations > > > > > > > > > behind the proposal and may be a useful place to pay extra > > > > > > > > > attention > > > > > > > > > when reviewing it. > > > > > > > > > > > > > > > > > > Looking forward to your feedback! > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Dian > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > [message truncated...]
