Hi Dian,

Thanks for incorporating the feedback. It looks solid overall and I'm happy
with the current state.

Including `take_batch` together with `take` does feel like a simple and
intuitive solution. Especially since type hints would surface it to the
user, making it easier to grasp than reaching for a different syntax.

Best,
Cole

On Thu, Jul 2, 2026 at 3:28 PM Dian Fu <[email protected]> wrote:

> Hi Cole,
>
> I missed two points in your last email. (noticed, however forgot to reply)
>
> - Regarding `with_column` and `with_columns`: I slightly tend to
> retain both of them. For with_column, it's widely used in Ray Data and
> Daft. For me, I found it provides a very convenient way to add a
> single column in an existing DataFrame.
>
> - Regarding to `map`:
> >> With `map` expecting the `return_dtype` as an argument, this feels
> >> clunky when you might want to re-use a function in multiple places. It
> >> would be much better to define the return type as a type hint or
> annotation
> >> in the function itself to avoid coupling typing to the transformation
> >> call-site rather than the transformation definition itself.
> Oh, actually the return_dtype is not mandatory. It supports type hint
> or accepts a function with @udf decorator and in this way, there is no
> need to declare the return_dtype.
>
> >> A separate question on `map` - your examples do not mark the functions
> as
> >> UDFs. Is this a convenience syntax? I assume these will be registered to
> >> run as UDFs under the hood, is that right?
> Yes, you are right.
>
> Regards,
> Dian
>
> On Thu, Jul 2, 2026 at 9:08 PM Dian Fu <[email protected]> wrote:
> >
> > Hi Matt,
> >
> > Good point on read_generic/write_generic over
> > read_custom()/write_custom(). I have updated the FLIP to reflect this.
> >
> > Regards,
> > Dian
> >
> > On Thu, Jul 2, 2026 at 9:06 PM Dian Fu <[email protected]> wrote:
> > >
> > > Hi Cole,
> > >
> > > Thanks for the detailed feedback. I went through the points and they
> > > all make sense.
> > >
> > > I have updated the FLIP for the following places:
> > >
> > > - Updated OVER window with the flatten API, it does make more sense,
> good point!
> > > - Replaced window with 4 APIs: tumble/hop/cumulate/session. It would
> > > be great if you could take a further look at it.
> > > - Replaced `filter(predicate, **constraints)` with
> > > `filter(*predicates, **constraints)`
> > > - Added `iter_rows`, `iter_batches`, and `take` in the FLIP
> > >
> > > Regarding your concern on the `take` API, do you think it makes sense
> > > to introduce some kind of API like `take_batch` (just like what's done
> > > in Ray Data)? It may look like the following:
> > >
> > > ```
> > >     def take_batch(
> > >         self,
> > >         n: int,
> > >         *,
> > >         timeout: int | None = None,
> > >         batch_format: Literal["pandas", "pyarrow"] = "pandas",
> > >         include_row_kind: bool = False,
> > >         row_kind_field: str = "__row_kind__",
> > >     ) -> pandas.DataFrame | pyarrow.Table
> > > ```
> > >
> > > Do you think it could address the notebook/IPython concern you had in
> mind?
> > >
> > > Best,
> > > Dian
> > >
> > > On Thu, Jul 2, 2026 at 3:30 AM Matt Belle via dev <
> [email protected]> wrote:
> > > >
> > > > Hi Dian,
> > > >
> > > > Thanks for putting together FLIP-591, really solid work.
> > > >
> > > > I wanted to suggest renaming `read_custom()` and `write_custom()` to
> `read_generic()` and `write_generic()`.
> > > >
> > > > As someone experienced with Python but new to Flink, my immediate
> assumption with these functions was that they were for implementing my own
> custom connectors. After reading through the discussion thread, I see now
> that they're actually escape hatches for accessing existing Flink
> connectors that don't have dedicated DataFrame helpers yet (Elasticsearch,
> Cassandra, etc).
> > > >
> > > > I think the semantic difference matters here: "generic" implies
> working with an existing definition (any pre-defined connector), while
> "custom" implies any definition including user-defined ones. Switching to
> "generic" would make it more clear that this is for accessing existing
> connectors we haven't wrapped yet, not for implementing new ones.
> > > >
> > > > ```python
> > > > df = pf.read_generic(
> > > >     connector="elasticsearch",
> > > >     hosts="localhost:9200",
> > > >     index="events"
> > > > )
> > > > ```
> > > >
> > > > Just a naming change, no functional impact, but I think it would
> save confusion for other folks coming from the larger Python ecosystem.
> > > >
> > > > Thanks,
> > > > Matt
> > > >
> > > > On 2026/06/18 15:26:08 Dian Fu wrote:
> > > > > Hi FangYong,
> > > > >
> > > > > Thanks for the comments. These are very good points, especially for
> > > > > multimodal / AI-oriented workloads.
> > > > >
> > > > > 1. For multimodal read/write APIs, the current intent is to keep
> this
> > > > > FLIP focused on the general DataFrame API surface, while using
> > > > > read_custom / write_custom as the escape hatch for connectors or
> > > > > formats that do not yet have dedicated helpers. For multimodal data
> > > > > specifically, I totally agree with you that more explicit APIs
> should
> > > > > be introduced, e.g. read_video_frames, etc. You may have noticed
> that
> > > > > currently multimodal data-types are also not included in this FLIP.
> > > > > I'd like to make it a separate FLIP for multimodal data support
> (once
> > > > > multimodal data-types support is ready which are under-discussion
> in
> > > > > FLIP-589 and FLIP-590).
> > > > >
> > > > > 2. For map_batches, I agree with your point. For the initial FLIP,
> we
> > > > > currently keep batch_size for simplicity. I think introducing
> > > > > memory-budget-based batching is a good direction which may be
> worth a
> > > > > separate FLIP to make sure it's well designed.
> > > > >
> > > > > 3. GPU resource configuration is also a very good point: It
> depends on
> > > > > some work from Yi Zhang. However, I believe we will introduce GPU
> > > > > support in Python DataFrame API soon.
> > > > >
> > > > > 4. For class-based processors in map / map_batches: It's planned
> to be
> > > > > supported. I have added examples in the corresponding sections.
> > > > >
> > > > > Regards,
> > > > > Dian
> > > > >
> > > > > On Thu, Jun 18, 2026 at 10:25 PM Dian Fu <[email protected]> wrote:
> > > > > >
> > > > > > Hi Zander,
> > > > > >
> > > > > > Thanks for the feedback!
> > > > > >
> > > > > > Replies inline.
> > > > > >
> > > > > > Relationship to FLIP-541: Good point. FLIP-541 took the approach
> of
> > > > > > evolving the Table API itself toward a more DataFrame-like
> style. In
> > > > > > practice, though, I found that it turned out to be hard to push
> > > > > > forward: the Table API already has a large API surface, and for
> > > > > > compatibility reasons the DataFrame-style additions would have to
> > > > > > coexist with the existing Table-style APIs. Mixing the two
> styles in
> > > > > > one API tends to make things more confusing for users rather than
> > > > > > less. So I think a more viable direction may be to keep the
> Table API
> > > > > > positioned as the API aligned with Flink's existing
> relational/Table
> > > > > > model and introduce a separate, dedicated DataFrame API
> alongside it.
> > > > > > That keeps each API internally consistent — Table API for the
> > > > > > Flink-native relational model, DataFrame API for the
> Python/DataFrame
> > > > > > mental model — instead of blending both into one surface.
> > > > > >
> > > > > > API parity: My overall take is that we should adopt a
> DataFrame-style
> > > > > > API design rather than 100% following Pandas DataFrame API. This
> is
> > > > > > also the direction taken by the newer generation of Python data
> > > > > > projects such as Polars, Daft, and Ray Data: they share the
> familiar
> > > > > > DataFrame mental model and ergonomics, but each adapts the API
> to its
> > > > > > own execution engine and semantics rather than mirroring pandas
> > > > > > exactly. We follow the same philosophy here — borrow the
> > > > > > widely-adopted ergonomics, but stay consistent with Flink's
> > > > > > streaming/distributed semantics.
> > > > > >
> > > > > > On the specific points:
> > > > > > 1. dropna/fillna vs drop_null/fill_null: I'd lean toward keeping
> > > > > > drop_null/fill_null. Flink's type system has a real NULL (and
> NaN is a
> > > > > > distinct float value) and actually Python has None, so the FLIP
> > > > > > separates them: drop_null/fill_null for NULL and
> drop_nan/fill_nan for
> > > > > > NaN. The pandas dropna name conflates the two. Polars made the
> same
> > > > > > NULL/NaN split with drop_nulls/fill_null + fill_nan, so this
> also has
> > > > > > precedent.
> > > > > > 2. show() / display(): Good point. Peeking at data is essential.
> Have
> > > > > > added show, __repr__, _repr_html_ and _repr_mimebundle_ in the
> section
> > > > > > "DataFrame — Conversion & Execution".
> > > > > > 3. sort(by, descending=False): I kept descending= rather than
> > > > > > ascending=. The default is ascending in all cases, matching
> > > > > > pandas/PySpark/Polars/Daft behavior — the only difference is the
> flag
> > > > > > name, and the flag name descending aligns with Polars/Daft/Ray.
> While
> > > > > > Pyspark/Pandas takes ascending as the flag name. For me, I
> slightly
> > > > > > tend to align with Polars/Daft/Ray since our motivation is
> multimodal
> > > > > > data processing and our users will be familiar with these
> projects.
> > > > > > However, I'm open to this.
> > > > > > 4. map() element-wise vs row-wise: You're right that pandas
> > > > > > DataFrame.map is element-wise (it's the renamed applymap), while
> ours
> > > > > > is row-wise. We intentionally follow the Ray Data model here
> (map =
> > > > > > per-row, map_batches = vectorized), which fits the enrichment/ML
> use
> > > > > > cases. The name `apply` isn't paired with map_batches and so I
> tend to
> > > > > > use `map` here.
> > > > > > 5. Index: Yes, this is deliberate — the API has no implicit
> > > > > > pandas-style index. Flink is a distributed/streaming engine with
> no
> > > > > > global row order, so a positional index isn't well-defined.
> Row/column
> > > > > > selection is done by column reference and boolean filtering
> instead.
> > > > > > 6. in_place: DataFrames here are lazy logical plans, not mutable
> > > > > > in-memory buffers, so every operation returns a new DataFrame —
> > > > > > there's nothing to mutate in place.
> > > > > > 7. to_json/from_json and .values(): JSON is supported for
> > > > > > sources/sinks via read_json/write_json. Is that what you want?
> > > > > > 8. Properties (df.shape/df.info/df.describe): Have added
> `describe()`
> > > > > > in section "13. DataFrame — Conversion & Execution". Regarding
> > > > > > `df.shape and df.info`: I left out for now since it will force
> data
> > > > > > processing for `df.shape` which seems not that useful and schema
> > > > > > inspection is already available via df.schema.
> > > > > >
> > > > > > Execution model / low-level trigger: I like the escape-hatch
> idea.
> > > > > > Today an execution can be triggered via
> > > > > > to_pandas()/to_list()/collect()/show(). Could you elaborate on
> what
> > > > > > specific API you have in mind?
> > > > > >
> > > > > > EDA / common use cases: The primary use cases I envision are
> ML/AI
> > > > > > enrichment (this is to support FLIP-577: AI-Native Flink — An
> Umbrella
> > > > > > Proposal for Multimodal Data Processing). EDA is supported
> > > > > > (show/describe/to_pandas) but I'd keep the heavier EDA helpers
> minimal
> > > > > > and grow them based on demand.
> > > > > >
> > > > > > Thanks again and looking forward to collaborating!
> > > > > >
> > > > > > Regards,
> > > > > > Dian
> > > > > >
> > > > > > On Thu, Jun 18, 2026 at 8:50 PM Dian Fu <[email protected]> wrote:
> > > > > > >
> > > > > > > Hi Cole,
> > > > > > >
> > > > > > > Thanks for the feedback. Both suggestions make sense for me,
> and I've
> > > > > > > updated the FLIP.
> > > > > > >
> > > > > > > 1. kwargs aliasing in .agg: see section "6. DataFrame —
> Aggregation"
> > > > > > > 2. Attribute-style column references: see section "15.
> DataFrame —
> > > > > > > Column Access".
> > > > > > >
> > > > > > > Thanks again!
> > > > > > >
> > > > > > > Regards,
> > > > > > > Dian
> > > > > > >
> > > > > > > On Thu, Jun 18, 2026 at 11:06 AM Yong Fang <[email protected]>
> wrote:
> > > > > > > >
> > > > > > > > Thanks Fu Dian for initiating this discussion and +1 for the
> total design.
> > > > > > > >
> > > > > > > > I have several comments for this flip:
> > > > > > > >
> > > > > > > > 1. In multi-modal data processing scenarios, the operations
> are mostly
> > > > > > > > reading and writing files, images, audio and video. From the
> current API
> > > > > > > > perspective, these operations can only be added manually in
> read_custom and
> > > > > > > > write_custom, how are the read/write APIs for these types
> designed?
> > > > > > > >
> > > > > > > > 2. Currently, batch_size is controlled by row count in
> map_batches.
> > > > > > > > However, the per-row size of multimodal data varies
> dramatically — a single
> > > > > > > > 4K image can be up to 20 MB, while a piece of text may only
> be 100 B.
> > > > > > > > Splitting batches purely by row count may lead to OOM.
> Should we support
> > > > > > > > splitting batches by memory budget too?
> > > > > > > >
> > > > > > > > 3. GPU computing is a common requirement in multimodal
> processing, I don't
> > > > > > > > seem to see any related information in this set of APIs such
> as
> > > > > > > > map/map_batches and ect. How can we set GPU/CPU resources and
> > > > > > > > specifications for them and udfs?
> > > > > > > >
> > > > > > > > 4. In addition, users may need to load local models within
> map/map_batches
> > > > > > > > for data processing. The current APIs only support the
> callback format.
> > > > > > > > Should class types also be supported? This way, I/O and
> other operations
> > > > > > > > only need to be executed once for a class instance.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > FangYong
> > > > > > > >
> > > > > > > > On Thu, Jun 18, 2026 at 7:13 AM Zander Matheson <
> [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Very nice proposal, Dian Fu, thank you for putting this
> together!
> > > > > > > > >
> > > > > > > > > It feels like it supersedes FLIP-541
> > > > > > > > > <
> > > > > > > > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=378473217__;!!Ayb5sqE7!uTFyNJs_HvqmbYdAXr6pKTnZKQhtkZQWSSJRTiLnzEkWShqQAAG743RK5OXkDxDsm_MlGd01dWhS-Ui-U5bXdiY$
> > > > > > > > > >
> > > > > > > > > since
> > > > > > > > > it will solve many of the problems we initially discussed
> related to making
> > > > > > > > > Flink more Pythonic.
> > > > > > > > >
> > > > > > > > > I had some questions and comments on the flip shared
> below. Overall I
> > > > > > > > > really like the design you have proposed and look forward
> to collaborating
> > > > > > > > > hopefully!
> > > > > > > > >
> > > > > > > > > *Pandas/PySpark/Polars API Parity*
> > > > > > > > > What are your thoughts on parity with some of the
> established APIs. I see
> > > > > > > > > it is mentioned in the FLIP to both lower the barrier to
> entry for Python
> > > > > > > > > users who are familiar with Dataframe APIs, while also
> having a non-goal of
> > > > > > > > > providing full compatibility with Pandas, Polars etc.
> Below are a couple of
> > > > > > > > > dataframe api common that we could potentially more
> closely align to.
> > > > > > > > >
> > > > > > > > >    1. dropna and fillna instead of drop_null/fill_null.
> Python does not
> > > > > > > > >    have a Null concept, so it might make sense to keep the
> drop_na used in
> > > > > > > > >    other dataframe libraries?
> > > > > > > > >    2. show(), .display(). How to peek at the data? This is
> common patter in
> > > > > > > > >    development
> > > > > > > > >    3. sort(by, descending=False). This boolean flag is
> opposite what is
> > > > > > > > >    used in other libraries.
> > > > > > > > >    4. map() in Dataframe land is an element-wise
> computation, do we want to
> > > > > > > > >    break with that? I believe apply() is more similar.
> That being said,
> > > > > > > > > map as
> > > > > > > > >    intended from map reduce is more akin to the row-wise
> model shown. More
> > > > > > > > > a
> > > > > > > > >    question than anything else.
> > > > > > > > >    5. Index - I didn’t see any mention to indices in the
> document, is this
> > > > > > > > >    something we are explicitly avoiding? I have often use
> indices for
> > > > > > > > >    selecting groups of rows or columns for manipulation.
> > > > > > > > >    6. In_place - this may not be possible with the
> execution style, but is
> > > > > > > > >    there a possibility of having in_place booleans for
> things like unique
> > > > > > > > > or
> > > > > > > > >    sort where the output can either be done in place or
> the output needs
> > > > > > > > > to be
> > > > > > > > >    assigned to a new dataframe.
> > > > > > > > >    7. to_json/from_json and .values().
> > > > > > > > >    8. Properties - Do we want to add some of the niceties
> of spark/pandas
> > > > > > > > >    here if possible? df.shape, df.info, df.describe
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > *Execution Model*I like the idea of having triggers for
> the execution on
> > > > > > > > > write and materialization (to_pandas etc.). This is
> probably the meat of
> > > > > > > > > the problem for creating a dataframe API that feels like
> the other commonly
> > > > > > > > > used dataframe APIs. The balance between write and
> providing the
> > > > > > > > > statement_set to make the writes coordinated seems nice.
> But is there also
> > > > > > > > > potentially a world where we want to expose the ability to
> arbitrarily
> > > > > > > > > trigger an execution? Say for Exploratory Data Analysis in
> a notebook? This
> > > > > > > > > seems possible with to_pandas and to_list etc., but maybe
> having a lower
> > > > > > > > > level primitive that could be called could be a good
> escape hatch?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > *Exploratory Data Analysis*One of the biggest use cases
> for dataframes is
> > > > > > > > > exploratory data analysis. Is this something we want to
> encourage with this
> > > > > > > > > FLIP? It might make sense to add some of the EDA methods
> for that purpose.
> > > > > > > > > See above, show/display and execution trigger.
> > > > > > > > >
> > > > > > > > > *Common Use Cases*
> > > > > > > > > Related to the EDA note above, I am curious what you
> envision as the most
> > > > > > > > > common use cases for this API.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Zander
> > > > > > > > >
> > > > > > > > > On Wed, Jun 17, 2026 at 2:42 AM Cole Bailey via dev <
> [email protected]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Dian,
> > > > > > > > > >
> > > > > > > > > > There is a lot of good work here that aligns with what
> we have been
> > > > > > > > > > brainstorming for a better PyFlink experience.
> > > > > > > > > >
> > > > > > > > > > One ergonomic suggestion I'd love to see included is
> supporting pythonic
> > > > > > > > > > aliasing via kwargs within `.agg` similar to what is
> already outlined in
> > > > > > > > > > `with_columns` or `select`:
> > > > > > > > > >
> > > > > > > > > > The example would instead look like this:
> > > > > > > > > >
> > > > > > > > > > df.group_by("dept").agg(
> > > > > > > > > >     avg_salary=col("salary").avg(),
> > > > > > > > > >     headcount=col("id").count(),
> > > > > > > > > > )
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Another nice-to-have would be flexibility in column
> referencing, I see 2
> > > > > > > > > > variations scattered throughout the FLIP:
> > > > > > > > > >
> > > > > > > > > > col("age")
> > > > > > > > > >
> > > > > > > > > > df["age"]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Both of these make sense, I think we should also
> consider supporting attr
> > > > > > > > > > style column references since these can be reused across
> the lambda or
> > > > > > > > > > subscript filtering examples already in the FLIP:
> > > > > > > > > >
> > > > > > > > > > df.age
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > That would then give us this representative example:
> > > > > > > > > >
> > > > > > > > > > df.group_by("dept").agg(
> > > > > > > > > >     avg_salary=df.salary.avg(),
> > > > > > > > > >     headcount=df.id.count(),
> > > > > > > > > > )
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Cole
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Jun 17, 2026 at 6:12 AM Dian Fu <[email protected]>
> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start a discussion about FLIP-591:
> Introducing Python
> > > > > > > > > > > DataFrame API in PyFlink [1].
> > > > > > > > > > >
> > > > > > > > > > > This FLIP is to a sub-FLIP of the broader direction
> discussed in
> > > > > > > > > > > FLIP-577 (AI-Native Flink — An Umbrella Proposal for
> Multimodal Data
> > > > > > > > > > > Processing) [2]. This FLIP proposes a new public
> Python module,
> > > > > > > > > > > `pyflink.dataframe`, as a DataFrame-style API on top
> of the existing
> > > > > > > > > > > PyFlink Table API. The goal is not to introduce a new
> execution model,
> > > > > > > > > > > but to provide a more natural Python-facing entry
> point for users
> > > > > > > > > > > coming from the broader Python data ecosystem, while
> preserving Flink
> > > > > > > > > > > semantics and execution capabilities.
> > > > > > > > > > >
> > > > > > > > > > > The proposal focuses on:
> > > > > > > > > > >     - Designing a Python-friendly DataFrame API for
> PyFlink, including
> > > > > > > > > > > the API shape itself, a more user-friendly DataType
> design, unified
> > > > > > > > > > > configuration, reduced TableEnvironment boilerplate,
> and a practical
> > > > > > > > > > > multiple-sink model for end-to-end pipelines
> > > > > > > > > > >     - Providing ergonomic support for row-oriented
> Python
> > > > > > > > > > > transformations, including map / map_batches style
> operations for
> > > > > > > > > > > enrichment, feature engineering, and AI/ML workloads
> > > > > > > > > > >     - Exposing concurrency configuration so that
> expensive Python
> > > > > > > > > > > stages can be scaled independently, making it easier
> to build
> > > > > > > > > > > practical jobs directly with the DataFrame API
> > > > > > > > > > >     - Supporting Arrow as a first-class batch format
> for efficient
> > > > > > > > > > > interoperability with the Python ecosystem
> > > > > > > > > > >
> > > > > > > > > > > The Design Decisions section discusses the main design
> considerations
> > > > > > > > > > > behind the proposal and may be a useful place to pay
> extra attention
> > > > > > > > > > > when reviewing it.
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback!
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Dian
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > >
> > > > [message truncated...]
>

Reply via email to